diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index ade3febc5e27..38efa3f6bf6e 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -42,7 +42,7 @@ pattern_ingester: enabled: true metric_aggregation: enabled: true - log_push_observations: true + loki_address: localhost:3100 ruler: alertmanager_url: http://localhost:9093 diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index d51fc86d5eb0..3840252f1df6 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -612,6 +612,206 @@ pattern_ingester: # CLI flag: -pattern-ingester.max-eviction-ratio [max_eviction_ratio: | default = 0.25] + # Configures the metric aggregation and storage behavior of the pattern + # ingester. + metric_aggregation: + # Whether the pattern ingester metric aggregation is enabled. + # CLI flag: -pattern-ingester.metric-aggregation.enabled + [enabled: | default = false] + + # How often to downsample metrics from raw push observations. + # CLI flag: -pattern-ingester.metric-aggregation.downsample-period + [downsample_period: | default = 10s] + + # The address of the Loki instance to push aggregated metrics to. + # CLI flag: -pattern-ingester.metric-aggregation.loki-address + [loki_address: | default = ""] + + # The timeout for writing to Loki. + # CLI flag: -pattern-ingester.metric-aggregation.timeout + [timeout: | default = 10s] + + # How long to wait in between pushes to Loki. + # CLI flag: -pattern-ingester.metric-aggregation.push-period + [push_period: | default = 30s] + + # The HTTP client configuration for pushing metrics to Loki. + http_client_config: + basic_auth: + [username: | default = ""] + + [username_file: | default = ""] + + [username_ref: | default = ""] + + [password: | default = ""] + + [password_file: | default = ""] + + [password_ref: | default = ""] + + authorization: + [type: | default = ""] + + [credentials: | default = ""] + + [credentials_file: | default = ""] + + [credentials_ref: | default = ""] + + oauth2: + [client_id: | default = ""] + + [client_secret: | default = ""] + + [client_secret_file: | default = ""] + + [client_secret_ref: | default = ""] + + [scopes: ] + + [token_url: | default = ""] + + [endpoint_params: ] + + tls_config: + [ca: | default = ""] + + [cert: | default = ""] + + [key: | default = ""] + + [ca_file: | default = ""] + + [cert_file: | default = ""] + + [key_file: | default = ""] + + [ca_ref: | default = ""] + + [cert_ref: | default = ""] + + [key_ref: | default = ""] + + [server_name: | default = ""] + + [insecure_skip_verify: ] + + [min_version: ] + + [max_version: ] + + proxy_url: + [url: ] + + [no_proxy: | default = ""] + + [proxy_from_environment: ] + + [proxy_connect_header: ] + + [bearer_token: | default = ""] + + [bearer_token_file: | default = ""] + + tls_config: + [ca: | default = ""] + + [cert: | default = ""] + + [key: | default = ""] + + [ca_file: | default = ""] + + [cert_file: | default = ""] + + [key_file: | default = ""] + + [ca_ref: | default = ""] + + [cert_ref: | default = ""] + + [key_ref: | default = ""] + + [server_name: | default = ""] + + [insecure_skip_verify: ] + + [min_version: ] + + [max_version: ] + + [follow_redirects: ] + + [enable_http2: ] + + proxy_url: + [url: ] + + [no_proxy: | default = ""] + + [proxy_from_environment: ] + + [proxy_connect_header: ] + + http_headers: + [: ] + + # Whether to use TLS for pushing metrics to Loki. + # CLI flag: -pattern-ingester.metric-aggregation.tls + [use_tls: | default = false] + + # The basic auth configuration for pushing metrics to Loki. + basic_auth: + # Basic auth username for sending aggregations back to Loki. + # CLI flag: -pattern-ingester.metric-aggregation.basic-auth.username + [username: | default = ""] + + # Basic auth password for sending aggregations back to Loki. + # CLI flag: -pattern-ingester.metric-aggregation.basic-auth.password + [password: | default = ""] + + # The backoff configuration for pushing metrics to Loki. + backoff_config: + # Minimum delay when backing off. + # CLI flag: -pattern-ingester.metric-aggregation.backoff-min-period + [min_period: | default = 100ms] + + # Maximum delay when backing off. + # CLI flag: -pattern-ingester.metric-aggregation.backoff-max-period + [max_period: | default = 10s] + + # Number of times to backoff and retry before failing. + # CLI flag: -pattern-ingester.metric-aggregation.backoff-retries + [max_retries: | default = 10] + + # Configures the pattern tee which forwards requests to the pattern ingester. + tee_config: + # The size of the batch of raw logs to send for template mining + # CLI flag: -pattern-ingester.tee.batch-size + [batch_size: | default = 5000] + + # The max time between batches of raw logs to send for template mining + # CLI flag: -pattern-ingester.tee.batch-flush-interval + [batch_flush_interval: | default = 1s] + + # The number of log flushes to queue before dropping + # CLI flag: -pattern-ingester.tee.flush-queue-size + [flush_queue_size: | default = 1000] + + # the number of concurrent workers sending logs to the template service + # CLI flag: -pattern-ingester.tee.flush-worker-count + [flush_worker_count: | default = 100] + + # The max time we will try to flush any remaining logs to be mined when the + # service is stopped + # CLI flag: -pattern-ingester.tee.stop-flush-timeout + [stop_flush_timeout: | default = 30s] + + # Timeout for connections between the Loki and the pattern ingester. + # CLI flag: -pattern-ingester.connection-timeout + [connection_timeout: | default = 2s] + # The index_gateway block configures the Loki index gateway server, responsible # for serving index queries without the need to constantly interact with the # object store. diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index cd9524a9168e..f88d7424c5ee 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -60,16 +60,6 @@ const ( ringKey = "distributor" ringAutoForgetUnhealthyPeriods = 2 - - levelLabel = "detected_level" - logLevelDebug = "debug" - logLevelInfo = "info" - logLevelWarn = "warn" - logLevelError = "error" - logLevelFatal = "fatal" - logLevelCritical = "critical" - logLevelTrace = "trace" - logLevelUnknown = "unknown" ) var ( @@ -406,9 +396,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } else { logLevel = detectLogLevelFromLogEntry(entry, structuredMetadata) } - if logLevel != logLevelUnknown && logLevel != "" { + if logLevel != constants.LogLevelUnknown && logLevel != "" { entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{ - Name: levelLabel, + Name: constants.LevelLabel, Value: logLevel, }) } @@ -883,24 +873,24 @@ func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels. if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" { otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt) if err != nil { - return logLevelInfo + return constants.LogLevelInfo } if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) { - return logLevelUnknown + return constants.LogLevelUnknown } else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) { - return logLevelTrace + return constants.LogLevelTrace } else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) { - return logLevelDebug + return constants.LogLevelDebug } else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) { - return logLevelInfo + return constants.LogLevelInfo } else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) { - return logLevelWarn + return constants.LogLevelWarn } else if otlpSeverityNumber <= int(plog.SeverityNumberError4) { - return logLevelError + return constants.LogLevelError } else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) { - return logLevelFatal + return constants.LogLevelFatal } - return logLevelUnknown + return constants.LogLevelUnknown } return extractLogLevelFromLogLine(entry.Line) @@ -917,19 +907,19 @@ func extractLogLevelFromLogLine(log string) string { switch { case bytes.EqualFold(v, []byte("trace")), bytes.EqualFold(v, []byte("trc")): - return logLevelTrace + return constants.LogLevelTrace case bytes.EqualFold(v, []byte("debug")), bytes.EqualFold(v, []byte("dbg")): - return logLevelDebug + return constants.LogLevelDebug case bytes.EqualFold(v, []byte("info")), bytes.EqualFold(v, []byte("inf")): - return logLevelInfo + return constants.LogLevelInfo case bytes.EqualFold(v, []byte("warn")), bytes.EqualFold(v, []byte("wrn")): - return logLevelWarn + return constants.LogLevelWarn case bytes.EqualFold(v, []byte("error")), bytes.EqualFold(v, []byte("err")): - return logLevelError + return constants.LogLevelError case bytes.EqualFold(v, []byte("critical")): - return logLevelCritical + return constants.LogLevelCritical case bytes.EqualFold(v, []byte("fatal")): - return logLevelFatal + return constants.LogLevelFatal default: return detectLevelFromLogLine(log) } @@ -984,21 +974,21 @@ func isJSON(line string) bool { func detectLevelFromLogLine(log string) string { if strings.Contains(log, "info:") || strings.Contains(log, "INFO:") || strings.Contains(log, "info") || strings.Contains(log, "INFO") { - return logLevelInfo + return constants.LogLevelInfo } if strings.Contains(log, "err:") || strings.Contains(log, "ERR:") || strings.Contains(log, "error") || strings.Contains(log, "ERROR") { - return logLevelError + return constants.LogLevelError } if strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") || strings.Contains(log, "warning") || strings.Contains(log, "WARNING") { - return logLevelWarn + return constants.LogLevelWarn } if strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") { - return logLevelCritical + return constants.LogLevelCritical } if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") { - return logLevelDebug + return constants.LogLevelDebug } - return logLevelUnknown + return constants.LogLevelUnknown } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index c21b1e2561cd..bcd289d3a3df 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -1485,8 +1485,8 @@ func Test_DetectLogLevels(t *testing.T) { require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) require.Equal(t, push.LabelsAdapter{ { - Name: levelLabel, - Value: logLevelWarn, + Name: constants.LevelLabel, + Value: constants.LogLevelWarn, }, }, topVal.Streams[0].Entries[0].StructuredMetadata) }) @@ -1502,8 +1502,8 @@ func Test_DetectLogLevels(t *testing.T) { require.Equal(t, `{foo="bar", level="debug"}`, topVal.Streams[0].Labels) sm := topVal.Streams[0].Entries[0].StructuredMetadata require.Len(t, sm, 1) - require.Equal(t, sm[0].Name, levelLabel) - require.Equal(t, sm[0].Value, logLevelDebug) + require.Equal(t, sm[0].Name, constants.LevelLabel) + require.Equal(t, sm[0].Value, constants.LogLevelDebug) }) t.Run("log level detection enabled but log level already present as structured metadata", func(t *testing.T) { @@ -1514,7 +1514,7 @@ func Test_DetectLogLevels(t *testing.T) { writeReq.Streams[0].Entries[0].StructuredMetadata = push.LabelsAdapter{ { Name: "severity", - Value: logLevelWarn, + Value: constants.LogLevelWarn, }, } _, err := distributors[0].Push(ctx, writeReq) @@ -1525,10 +1525,10 @@ func Test_DetectLogLevels(t *testing.T) { require.Equal(t, push.LabelsAdapter{ { Name: "severity", - Value: logLevelWarn, + Value: constants.LogLevelWarn, }, { - Name: levelLabel, - Value: logLevelWarn, + Name: constants.LevelLabel, + Value: constants.LogLevelWarn, }, }, sm) }) @@ -1551,7 +1551,7 @@ func Test_detectLogLevelFromLogEntry(t *testing.T) { }, }, }, - expectedLogLevel: logLevelDebug, + expectedLogLevel: constants.LogLevelDebug, }, { name: "invalid severity number should not cause any issues", @@ -1563,126 +1563,126 @@ func Test_detectLogLevelFromLogEntry(t *testing.T) { }, }, }, - expectedLogLevel: logLevelInfo, + expectedLogLevel: constants.LogLevelInfo, }, { name: "non otlp without any of the log level keywords in log line", entry: logproto.Entry{ Line: "foo", }, - expectedLogLevel: logLevelUnknown, + expectedLogLevel: constants.LogLevelUnknown, }, { name: "non otlp with log level keywords in log line", entry: logproto.Entry{ Line: "this is a warning log", }, - expectedLogLevel: logLevelWarn, + expectedLogLevel: constants.LogLevelWarn, }, { name: "json log line with an error", entry: logproto.Entry{ Line: `{"foo":"bar","msg":"message with keyword error but it should not get picked up","level":"critical"}`, }, - expectedLogLevel: logLevelCritical, + expectedLogLevel: constants.LogLevelCritical, }, { name: "json log line with an error", entry: logproto.Entry{ Line: `{"FOO":"bar","MSG":"message with keyword error but it should not get picked up","LEVEL":"Critical"}`, }, - expectedLogLevel: logLevelCritical, + expectedLogLevel: constants.LogLevelCritical, }, { name: "json log line with an warning", entry: logproto.Entry{ Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","level":"warn"}`, }, - expectedLogLevel: logLevelWarn, + expectedLogLevel: constants.LogLevelWarn, }, { name: "json log line with an warning", entry: logproto.Entry{ Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","SEVERITY":"FATAL"}`, }, - expectedLogLevel: logLevelFatal, + expectedLogLevel: constants.LogLevelFatal, }, { name: "json log line with an error in block case", entry: logproto.Entry{ Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","level":"ERR"}`, }, - expectedLogLevel: logLevelError, + expectedLogLevel: constants.LogLevelError, }, { name: "json log line with an INFO in block case", entry: logproto.Entry{ Line: `{"foo":"bar","msg":"message with keyword INFO get picked up"}`, }, - expectedLogLevel: logLevelInfo, + expectedLogLevel: constants.LogLevelInfo, }, { name: "logfmt log line with an INFO and not level returns info log level", entry: logproto.Entry{ Line: `foo=bar msg="message with info and not level should get picked up"`, }, - expectedLogLevel: logLevelInfo, + expectedLogLevel: constants.LogLevelInfo, }, { name: "logfmt log line with a warn", entry: logproto.Entry{ Line: `foo=bar msg="message with keyword error but it should not get picked up" level=warn`, }, - expectedLogLevel: logLevelWarn, + expectedLogLevel: constants.LogLevelWarn, }, { name: "logfmt log line with a warn with camel case", entry: logproto.Entry{ Line: `foo=bar msg="message with keyword error but it should not get picked up" level=Warn`, }, - expectedLogLevel: logLevelWarn, + expectedLogLevel: constants.LogLevelWarn, }, { name: "logfmt log line with a trace", entry: logproto.Entry{ Line: `foo=bar msg="message with keyword error but it should not get picked up" level=Trace`, }, - expectedLogLevel: logLevelTrace, + expectedLogLevel: constants.LogLevelTrace, }, { name: "logfmt log line with some other level returns unknown log level", entry: logproto.Entry{ Line: `foo=bar msg="message with keyword but it should not get picked up" level=NA`, }, - expectedLogLevel: logLevelUnknown, + expectedLogLevel: constants.LogLevelUnknown, }, { name: "logfmt log line with label Severity is allowed for level detection", entry: logproto.Entry{ Line: `foo=bar msg="message with keyword but it should not get picked up" severity=critical`, }, - expectedLogLevel: logLevelCritical, + expectedLogLevel: constants.LogLevelCritical, }, { name: "logfmt log line with label Severity with camelcase is allowed for level detection", entry: logproto.Entry{ Line: `Foo=bar MSG="Message with keyword but it should not get picked up" Severity=critical`, }, - expectedLogLevel: logLevelCritical, + expectedLogLevel: constants.LogLevelCritical, }, { name: "logfmt log line with a info with non standard case", entry: logproto.Entry{ Line: `foo=bar msg="message with keyword error but it should not get picked up" level=inFO`, }, - expectedLogLevel: logLevelInfo, + expectedLogLevel: constants.LogLevelInfo, }, { name: "logfmt log line with a info with non block case for level", entry: logproto.Entry{ Line: `FOO=bar MSG="message with keyword error but it should not get picked up" LEVEL=inFO`, }, - expectedLogLevel: logLevelInfo, + expectedLogLevel: constants.LogLevelInfo, }, } { t.Run(tc.name, func(t *testing.T) { @@ -1707,7 +1707,7 @@ func Benchmark_extractLogLevelFromLogLine(b *testing.B) { for i := 0; i < b.N; i++ { level := extractLogLevelFromLogLine(logLine) - require.Equal(b, logLevelUnknown, level) + require.Equal(b, constants.LogLevelUnknown, level) } } @@ -1716,7 +1716,7 @@ func Benchmark_optParseExtractLogLevelFromLogLineJson(b *testing.B) { for i := 0; i < b.N; i++ { level := extractLogLevelFromLogLine(logLine) - require.Equal(b, logLevelError, level) + require.Equal(b, constants.LogLevelError, level) } } @@ -1725,6 +1725,6 @@ func Benchmark_optParseExtractLogLevelFromLogLineLogfmt(b *testing.B) { for i := 0; i < b.N; i++ { level := extractLogLevelFromLogLine(logLine) - require.Equal(b, logLevelInfo, level) + require.Equal(b, constants.LogLevelInfo, level) } } diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index b4f730a58a7f..fedbc6e8fbc0 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -158,6 +158,11 @@ func (v Validator) ValidateLabels(ctx validationContext, ls labels.Labels, strea return fmt.Errorf(validation.MissingLabelsErrorMsg) } + // Skip validation for aggregated metric streams, as we create those for internal use + if ls.Has(push.AggregatedMetricLabel) { + return nil + } + numLabelNames := len(ls) // This is a special case that's often added by the Loki infrastructure. It may result in allowing one extra label // if incoming requests already have a service_name diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index a9b174952f28..e048546fb408 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -10,8 +10,6 @@ import ( "net/http" "time" - "github.com/grafana/loki/v3/pkg/logql/syntax" - "github.com/go-kit/log/level" "github.com/grafana/loki/pkg/push" @@ -27,6 +25,7 @@ import ( "github.com/grafana/loki/v3/pkg/analytics" "github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/util/unmarshal" @@ -40,18 +39,18 @@ var ( Namespace: constants.Loki, Name: "distributor_bytes_received_total", Help: "The total number of uncompressed bytes received per tenant. Includes structured metadata bytes.", - }, []string{"tenant", "retention_hours"}) + }, []string{"tenant", "retention_hours", "aggregated_metric"}) structuredMetadataBytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_structured_metadata_bytes_received_total", Help: "The total number of uncompressed bytes received per tenant for entries' structured metadata", - }, []string{"tenant", "retention_hours"}) + }, []string{"tenant", "retention_hours", "aggregated_metric"}) linesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: constants.Loki, Name: "distributor_lines_received_total", Help: "The total number of lines received per tenant", - }, []string{"tenant"}) + }, []string{"tenant", "aggregated_metric"}) bytesReceivedStats = analytics.NewCounter("distributor_bytes_received") structuredMetadataBytesReceivedStats = analytics.NewCounter("distributor_structured_metadata_bytes_received") @@ -59,9 +58,10 @@ var ( ) const ( - applicationJSON = "application/json" - LabelServiceName = "service_name" - ServiceUnknown = "unknown_service" + applicationJSON = "application/json" + LabelServiceName = "service_name" + ServiceUnknown = "unknown_service" + AggregatedMetricLabel = "__aggregated_metric__" ) type TenantsRetention interface { @@ -83,8 +83,10 @@ func (EmptyLimits) DiscoverServiceName(string) []string { return nil } -type RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) -type RequestParserWrapper func(inner RequestParser) RequestParser +type ( + RequestParser func(userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, tracker UsageTracker) (*logproto.PushRequest, *Stats, error) + RequestParserWrapper func(inner RequestParser) RequestParser +) type Stats struct { Errs []error @@ -100,6 +102,8 @@ type Stats struct { BodySize int64 // Extra is a place for a wrapped perser to record any interesting stats as key-value pairs to be logged Extra []any + + IsAggregatedMetric bool } func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRetention TenantsRetention, limits Limits, pushRequestParser RequestParser, tracker UsageTracker) (*logproto.PushRequest, error) { @@ -112,10 +116,12 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete entriesSize int64 structuredMetadataSize int64 ) + + isAggregatedMetric := fmt.Sprintf("%t", pushStats.IsAggregatedMetric) + for retentionPeriod, size := range pushStats.LogLinesBytes { retentionHours := RetentionPeriodToString(retentionPeriod) - - bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) + bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) bytesReceivedStats.Inc(size) entriesSize += size } @@ -123,8 +129,8 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete for retentionPeriod, size := range pushStats.StructuredMetadataBytes { retentionHours := RetentionPeriodToString(retentionPeriod) - structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) - bytesIngested.WithLabelValues(userID, retentionHours).Add(float64(size)) + structuredMetadataBytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) + bytesIngested.WithLabelValues(userID, retentionHours, isAggregatedMetric).Add(float64(size)) bytesReceivedStats.Inc(size) structuredMetadataBytesReceivedStats.Inc(size) @@ -134,7 +140,7 @@ func ParseRequest(logger log.Logger, userID string, r *http.Request, tenantsRete // incrementing tenant metrics if we have a tenant. if pushStats.NumLines != 0 && userID != "" { - linesIngested.WithLabelValues(userID).Add(float64(pushStats.NumLines)) + linesIngested.WithLabelValues(userID, isAggregatedMetric).Add(float64(pushStats.NumLines)) } linesReceivedStats.Inc(pushStats.NumLines) @@ -237,7 +243,11 @@ func ParseLokiRequest(userID string, r *http.Request, tenantsRetention TenantsRe return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) } - if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 { + if lbs.Has(AggregatedMetricLabel) { + pushStats.IsAggregatedMetric = true + } + + if !lbs.Has(LabelServiceName) && len(discoverServiceName) > 0 && !pushStats.IsAggregatedMetric { serviceName := ServiceUnknown for _, labelName := range discoverServiceName { if labelVal := lbs.Get(labelName); labelVal != "" { diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 0484afe31c3b..80e7c5e7eead 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -60,6 +60,7 @@ func TestParseRequest(t *testing.T) { expectedLines int expectedBytesUsageTracker map[string]float64 expectedLabels labels.Labels + aggregatedMetric bool }{ { path: `/loki/api/v1/push`, @@ -228,6 +229,18 @@ func TestParseRequest(t *testing.T) { expectedBytesUsageTracker: map[string]float64{`{foo="bar2"}`: float64(len("fizzbuss"))}, expectedLabels: labels.FromStrings("foo", "bar2", LabelServiceName, ServiceUnknown), }, + { + path: `/loki/api/v1/push`, + body: `{"streams": [{ "stream": { "__aggregated_metric__": "stuff", "foo": "bar2", "job": "stuff" }, "values": [ [ "1570818238000000000", "fizzbuzz" ] ] }]}`, + contentType: `application/json`, + valid: true, + enableServiceDiscovery: true, + expectedBytes: len("fizzbuzz"), + expectedLines: 1, + expectedBytesUsageTracker: map[string]float64{`{__aggregated_metric__="stuff", foo="bar2", job="stuff"}`: float64(len("fizzbuss"))}, + expectedLabels: labels.FromStrings("__aggregated_metric__", "stuff", "foo", "bar2", "job", "stuff"), + aggregatedMetric: true, + }, } { t.Run(fmt.Sprintf("test %d", index), func(t *testing.T) { structuredMetadataBytesIngested.Reset() @@ -259,9 +272,32 @@ func TestParseRequest(t *testing.T) { require.Equal(t, test.expectedBytes, bytesReceived) require.Equalf(t, tracker.Total(), float64(bytesReceived), "tracked usage bytes must equal bytes received metric") require.Equal(t, test.expectedLines, linesReceived) - require.Equal(t, float64(test.expectedStructuredMetadataBytes), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", ""))) - require.Equal(t, float64(test.expectedBytes), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) - require.Equal(t, float64(test.expectedLines), testutil.ToFloat64(linesIngested.WithLabelValues("fake"))) + require.Equal( + t, + float64(test.expectedStructuredMetadataBytes), + testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric))), + ) + require.Equal( + t, + float64(test.expectedBytes), + testutil.ToFloat64( + bytesIngested.WithLabelValues( + "fake", + "", + fmt.Sprintf("%t", test.aggregatedMetric), + ), + ), + ) + require.Equal( + t, + float64(test.expectedLines), + testutil.ToFloat64( + linesIngested.WithLabelValues( + "fake", + fmt.Sprintf("%t", test.aggregatedMetric), + ), + ), + ) require.Equal(t, test.expectedLabels.String(), data.Streams[0].Labels) require.InDeltaMapValuesf(t, test.expectedBytesUsageTracker, tracker.receivedBytes, 0.0, "%s != %s", test.expectedBytesUsageTracker, tracker.receivedBytes) } else { @@ -270,9 +306,9 @@ func TestParseRequest(t *testing.T) { require.Equal(t, 0, structuredMetadataBytesReceived) require.Equal(t, 0, bytesReceived) require.Equal(t, 0, linesReceived) - require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", ""))) - require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", ""))) - require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake"))) + require.Equal(t, float64(0), testutil.ToFloat64(structuredMetadataBytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric)))) + require.Equal(t, float64(0), testutil.ToFloat64(bytesIngested.WithLabelValues("fake", "", fmt.Sprintf("%t", test.aggregatedMetric)))) + require.Equal(t, float64(0), testutil.ToFloat64(linesIngested.WithLabelValues("fake", fmt.Sprintf("%t", test.aggregatedMetric)))) } }) } diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index be80e71e9352..01074ddf8041 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -353,7 +353,7 @@ type Loki struct { IngesterRF1 ingester_rf1.Interface IngesterRF1RingClient *ingester_rf1.RingClient PatternIngester *pattern.Ingester - PatternRingClient *pattern.RingClient + PatternRingClient pattern.RingClient Querier querier.Querier cacheGenerationLoader queryrangebase.CacheGenNumberLoader querierAPI *querier.QuerierAPI @@ -704,8 +704,9 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(QuerySchedulerRing, t.initQuerySchedulerRing, modules.UserInvisibleModule) mm.RegisterModule(Analytics, t.initAnalytics) mm.RegisterModule(CacheGenerationLoader, t.initCacheGenerationLoader) - mm.RegisterModule(PatternIngester, t.initPatternIngester) mm.RegisterModule(PatternRingClient, t.initPatternRingClient, modules.UserInvisibleModule) + mm.RegisterModule(PatternIngesterTee, t.initPatternIngesterTee, modules.UserInvisibleModule) + mm.RegisterModule(PatternIngester, t.initPatternIngester) mm.RegisterModule(Metastore, t.initMetastore) mm.RegisterModule(MetastoreClient, t.initMetastoreClient, modules.UserInvisibleModule) @@ -721,7 +722,7 @@ func (t *Loki) setupModuleManager() error { Overrides: {RuntimeConfig}, OverridesExporter: {Overrides, Server}, TenantConfigs: {RuntimeConfig}, - Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, IngesterRF1RingClient, Analytics}, + Distributor: {Ring, Server, Overrides, TenantConfigs, PatternRingClient, PatternIngesterTee, IngesterRF1RingClient, Analytics}, Store: {Overrides, IndexGatewayRing}, IngesterRF1: {Store, Server, MemberlistKV, TenantConfigs, MetastoreClient, Analytics}, Ingester: {Store, Server, MemberlistKV, TenantConfigs, Analytics}, @@ -739,8 +740,9 @@ func (t *Loki) setupModuleManager() error { BloomPlanner: {Server, BloomStore, Analytics, Store}, BloomBuilder: {Server, BloomStore, Analytics, Store}, BloomStore: {IndexGatewayRing}, - PatternIngester: {Server, MemberlistKV, Analytics}, PatternRingClient: {Server, MemberlistKV, Analytics}, + PatternIngesterTee: {Server, MemberlistKV, Analytics, PatternRingClient}, + PatternIngester: {Server, MemberlistKV, Analytics, PatternRingClient, PatternIngesterTee}, IngesterRF1RingClient: {Server, MemberlistKV, Analytics}, Metastore: {Server, MetastoreClient}, IngesterQuerier: {Ring}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 5e279845dfd3..60e2683b599f 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -111,6 +111,7 @@ const ( IngesterRF1 string = "ingester-rf1" IngesterRF1RingClient string = "ingester-rf1-ring-client" PatternIngester string = "pattern-ingester" + PatternIngesterTee string = "pattern-ingester-tee" PatternRingClient string = "pattern-ring-client" IngesterQuerier string = "ingester-querier" IngesterGRPCInterceptors string = "ingester-query-tags-interceptors" @@ -333,13 +334,6 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) { } func (t *Loki) initDistributor() (services.Service, error) { - if t.Cfg.Pattern.Enabled { - patternTee, err := pattern.NewTee(t.Cfg.Pattern, t.PatternRingClient, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, util_log.Logger) - if err != nil { - return nil, err - } - t.Tee = distributor.WrapTee(t.Tee, patternTee) - } if t.Cfg.IngesterRF1.Enabled { rf1Tee, err := ingester_rf1.NewTee(t.Cfg.IngesterRF1, t.IngesterRF1RingClient, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, util_log.Logger) if err != nil { @@ -714,7 +708,13 @@ func (t *Loki) initPatternIngester() (_ services.Service, err error) { return nil, nil } t.Cfg.Pattern.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort - t.PatternIngester, err = pattern.New(t.Cfg.Pattern, t.Cfg.MetricsNamespace, prometheus.DefaultRegisterer, util_log.Logger) + t.PatternIngester, err = pattern.New( + t.Cfg.Pattern, + t.PatternRingClient, + t.Cfg.MetricsNamespace, + prometheus.DefaultRegisterer, + util_log.Logger, + ) if err != nil { return nil, err } @@ -740,6 +740,41 @@ func (t *Loki) initPatternRingClient() (_ services.Service, err error) { return ringClient, nil } +func (t *Loki) initPatternIngesterTee() (services.Service, error) { + logger := util_log.Logger + + if !t.Cfg.Pattern.Enabled { + _ = level.Debug(logger).Log("msg", " pattern ingester tee service disabled") + return nil, nil + } + _ = level.Debug(logger).Log("msg", "initializing pattern ingester tee service...") + + svc, err := pattern.NewTeeService( + t.Cfg.Pattern, + t.PatternRingClient, + t.Cfg.MetricsNamespace, + prometheus.DefaultRegisterer, + logger, + ) + if err != nil { + return nil, err + } + + t.Tee = distributor.WrapTee(t.Tee, svc) + + return services.NewBasicService( + svc.Start, + func(_ context.Context) error { + svc.WaitUntilDone() + return nil + }, + func(_ error) error { + svc.WaitUntilDone() + return nil + }, + ), nil +} + func (t *Loki) initTableManager() (services.Service, error) { level.Warn(util_log.Logger).Log("msg", "table manager is deprecated. Consider migrating to tsdb index which relies on a compactor instead.") diff --git a/pkg/pattern/aggregation/config.go b/pkg/pattern/aggregation/config.go new file mode 100644 index 000000000000..b88eb8499ca7 --- /dev/null +++ b/pkg/pattern/aggregation/config.go @@ -0,0 +1,107 @@ +package aggregation + +import ( + "flag" + "time" + + "github.com/grafana/dskit/backoff" + "github.com/prometheus/common/config" +) + +type Config struct { + // TODO(twhitney): This needs to be a per-tenant config + Enabled bool `yaml:"enabled,omitempty" doc:"description=Whether the pattern ingester metric aggregation is enabled."` + DownsamplePeriod time.Duration `yaml:"downsample_period"` + LokiAddr string `yaml:"loki_address,omitempty" doc:"description=The address of the Loki instance to push aggregated metrics to."` + WriteTimeout time.Duration `yaml:"timeout,omitempty" doc:"description=The timeout for writing to Loki."` + PushPeriod time.Duration `yaml:"push_period,omitempty" doc:"description=How long to wait in between pushes to Loki."` + HTTPClientConfig config.HTTPClientConfig `yaml:"http_client_config,omitempty" doc:"description=The HTTP client configuration for pushing metrics to Loki."` + UseTLS bool `yaml:"use_tls,omitempty" doc:"description=Whether to use TLS for pushing metrics to Loki."` + BasicAuth BasicAuth `yaml:"basic_auth,omitempty" doc:"description=The basic auth configuration for pushing metrics to Loki."` + BackoffConfig backoff.Config `yaml:"backoff_config,omitempty" doc:"description=The backoff configuration for pushing metrics to Loki."` +} + +// RegisterFlags registers pattern ingester related flags. +func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { + cfg.RegisterFlagsWithPrefix(fs, "") +} + +func (cfg *Config) RegisterFlagsWithPrefix(fs *flag.FlagSet, prefix string) { + fs.BoolVar( + &cfg.Enabled, + prefix+"metric-aggregation.enabled", + false, + "Flag to enable or disable metric aggregation.", + ) + fs.DurationVar( + &cfg.DownsamplePeriod, + prefix+"metric-aggregation.downsample-period", + 10*time.Second, + "How often to downsample metrics from raw push observations.", + ) + fs.StringVar( + &cfg.LokiAddr, + prefix+"metric-aggregation.loki-address", + "", + "Loki address to send aggregated metrics to.", + ) + fs.DurationVar( + &cfg.WriteTimeout, + prefix+"metric-aggregation.timeout", + 10*time.Second, + "How long to wait write response from Loki", + ) + fs.DurationVar( + &cfg.PushPeriod, + prefix+"metric-aggregation.push-period", + 30*time.Second, + "How long to wait write response from Loki", + ) + fs.BoolVar( + &cfg.UseTLS, + prefix+"metric-aggregation.tls", + false, + "Does the loki connection use TLS?", + ) + + cfg.BackoffConfig.RegisterFlagsWithPrefix(prefix+"metric-aggregation", fs) + cfg.BasicAuth.RegisterFlagsWithPrefix(prefix+"metric-aggregation.", fs) +} + +// BasicAuth contains basic HTTP authentication credentials. +type BasicAuth struct { + Username string `yaml:"username" json:"username"` + // UsernameFile string `yaml:"username_file,omitempty" json:"username_file,omitempty"` + Password config.Secret `yaml:"password,omitempty" json:"password,omitempty"` + // PasswordFile string `yaml:"password_file,omitempty" json:"password_file,omitempty"` +} + +func (cfg *BasicAuth) RegisterFlagsWithPrefix(prefix string, fs *flag.FlagSet) { + fs.StringVar( + &cfg.Username, + prefix+"basic-auth.username", + "", + "Basic auth username for sending aggregations back to Loki.", + ) + fs.Var( + newSecretValue(config.Secret(""), &cfg.Password), + prefix+"basic-auth.password", + "Basic auth password for sending aggregations back to Loki.", + ) +} + +type secretValue string + +func newSecretValue(val config.Secret, p *config.Secret) *secretValue { + *p = val + return (*secretValue)(p) +} + +func (s *secretValue) Set(val string) error { + *s = secretValue(val) + return nil +} + +func (s *secretValue) Get() any { return string(*s) } + +func (s *secretValue) String() string { return string(*s) } diff --git a/pkg/pattern/aggregation/metrics.go b/pkg/pattern/aggregation/metrics.go new file mode 100644 index 000000000000..d777af50b813 --- /dev/null +++ b/pkg/pattern/aggregation/metrics.go @@ -0,0 +1,28 @@ +package aggregation + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type ChunkMetrics struct { + chunks *prometheus.GaugeVec + samples *prometheus.CounterVec +} + +func NewChunkMetrics(r prometheus.Registerer, metricsNamespace string) *ChunkMetrics { + return &ChunkMetrics{ + chunks: promauto.With(r).NewGaugeVec(prometheus.GaugeOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "metric_chunks", + Help: "The total number of chunks in memory.", + }, []string{"service_name"}), + samples: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "metric_samples", + Help: "The total number of samples in memory.", + }, []string{"service_name"}), + } +} diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go new file mode 100644 index 000000000000..9aac2e3a5050 --- /dev/null +++ b/pkg/pattern/aggregation/push.go @@ -0,0 +1,329 @@ +package aggregation + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/url" + "sync" + "time" + + "github.com/dustin/go-humanize" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/golang/snappy" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + + "github.com/grafana/loki/v3/pkg/loghttp/push" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/util/build" + + "github.com/grafana/dskit/backoff" + + "github.com/gogo/protobuf/proto" +) + +const ( + defaultContentType = "application/x-protobuf" + defaultMaxReponseBufferLen = 1024 + + pushEndpoint = "/loki/api/v1/push" +) + +var defaultUserAgent = fmt.Sprintf("pattern-ingester-push/%s", build.GetVersion().Version) + +type EntryWriter interface { + // WriteEntry handles sending the log to the output + // To maintain consistent log timing, Write is expected to be non-blocking + WriteEntry(ts time.Time, entry string, lbls labels.Labels) + Stop() +} + +// Push is a io.Writer, that writes given log entries by pushing +// directly to the given loki server URL. Each `Push` instance handles for a single tenant. +// No batching of log lines happens when sending to Loki. +type Push struct { + lokiURL string + tenantID string + httpClient *http.Client + userAgent string + contentType string + logger log.Logger + + // shutdown channels + quit chan struct{} + + // auth + username, password string + + // Will add these label to the logs pushed to loki + labelName, labelValue, streamName, streamValue string + + // push retry and backoff + backoff *backoff.Config + + entries entries +} + +type entry struct { + ts time.Time + entry string + labels labels.Labels +} + +type entries struct { + lock sync.Mutex + entries []entry +} + +func (e *entries) add(entry entry) { + e.lock.Lock() + defer e.lock.Unlock() + e.entries = append(e.entries, entry) +} + +func (e *entries) reset() []entry { + e.lock.Lock() + defer e.lock.Unlock() + entries := e.entries + e.entries = make([]entry, 0, len(entries)) + return entries +} + +// NewPush creates an instance of `Push` which writes logs directly to given `lokiAddr` +func NewPush( + lokiAddr, tenantID string, + timeout time.Duration, + pushPeriod time.Duration, + cfg config.HTTPClientConfig, + username, password string, + useTLS bool, + backoffCfg *backoff.Config, + logger log.Logger, +) (*Push, error) { + client, err := config.NewClientFromConfig(cfg, "pattern-ingester-push", config.WithHTTP2Disabled()) + if err != nil { + return nil, err + } + + client.Timeout = timeout + scheme := "http" + + // setup tls transport + if useTLS { + scheme = "https" + } + + u := url.URL{ + Scheme: scheme, + Host: lokiAddr, + Path: pushEndpoint, + } + + p := &Push{ + lokiURL: u.String(), + tenantID: tenantID, + httpClient: client, + userAgent: defaultUserAgent, + contentType: defaultContentType, + username: username, + password: password, + logger: logger, + quit: make(chan struct{}), + backoff: backoffCfg, + entries: entries{ + entries: make([]entry, 0), + }, + } + + go p.run(pushPeriod) + return p, nil +} + +// WriteEntry implements EntryWriter +func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) { + p.entries.add(entry{ts: ts, entry: e, labels: lbls}) +} + +// Stop will cancel any ongoing requests and stop the goroutine listening for requests +func (p *Push) Stop() { + if p.quit != nil { + close(p.quit) + p.quit = nil + } +} + +// buildPayload creates the snappy compressed protobuf to send to Loki +func (p *Push) buildPayload() ([]byte, error) { + entries := p.entries.reset() + + entriesByStream := make(map[string][]logproto.Entry) + for _, e := range entries { + stream := e.labels.String() + entries, ok := entriesByStream[stream] + if !ok { + entries = make([]logproto.Entry, 0) + } + + entries = append(entries, logproto.Entry{ + Timestamp: e.ts, + Line: e.entry, + }) + entriesByStream[stream] = entries + } + + streams := make([]logproto.Stream, 0, len(entriesByStream)) + for s, entries := range entriesByStream { + lbls, err := syntax.ParseLabels(s) + if err != nil { + continue + } + + streams = append(streams, logproto.Stream{ + Labels: s, + Entries: entries, + Hash: lbls.Hash(), + }) + } + + req := &logproto.PushRequest{ + Streams: streams, + } + payload, err := proto.Marshal(req) + if err != nil { + return []byte{}, fmt.Errorf("failed to marshal payload to json: %w", err) + } + + payload = snappy.Encode(nil, payload) + + return payload, nil +} + +// run pulls lines out of the channel and sends them to Loki +func (p *Push) run(pushPeriod time.Duration) { + ctx, cancel := context.WithCancel(context.Background()) + pushTicker := time.NewTimer(pushPeriod) + defer pushTicker.Stop() + + defer func() { + pushTicker.Stop() + }() + + for { + select { + case <-p.quit: + cancel() + return + case <-pushTicker.C: + payload, err := p.buildPayload() + if err != nil { + level.Error(p.logger).Log("msg", "failed to build payload", "err", err) + continue + } + + // We will use a timeout within each attempt to send + backoff := backoff.New(context.Background(), *p.backoff) + + // send log with retry + for { + status := 0 + status, err = p.send(ctx, payload) + if err == nil { + pushTicker.Reset(pushPeriod) + break + } + + if status > 0 && status != 429 && status/100 != 5 { + level.Error(p.logger).Log("msg", "failed to send entry, server rejected push with a non-retryable status code", "status", status, "err", err) + pushTicker.Reset(pushPeriod) + break + } + + if !backoff.Ongoing() { + level.Error(p.logger).Log("msg", "failed to send entry, retries exhausted, entry will be dropped", "entry", "status", status, "error", err) + pushTicker.Reset(pushPeriod) + break + } + level.Warn(p.logger). + Log("msg", "failed to send entry, retrying", "entry", "status", status, "error", err) + backoff.Wait() + } + + } + } +} + +// send makes one attempt to send the payload to Loki +func (p *Push) send(ctx context.Context, payload []byte) (int, error) { + var ( + err error + resp *http.Response + ) + // Set a timeout for the request + ctx, cancel := context.WithTimeout(ctx, p.httpClient.Timeout) + defer cancel() + req, err := http.NewRequestWithContext(ctx, "POST", p.lokiURL, bytes.NewReader(payload)) + if err != nil { + return -1, fmt.Errorf("failed to create push request: %w", err) + } + req.Header.Set("Content-Type", p.contentType) + req.Header.Set("User-Agent", p.userAgent) + + // set org-id + if p.tenantID != "" { + req.Header.Set("X-Scope-OrgID", p.tenantID) + } + + // basic auth if provided + if p.username != "" { + req.SetBasicAuth(p.username, p.password) + } + + resp, err = p.httpClient.Do(req) + if err != nil { + return -1, fmt.Errorf("failed to push payload: %w", err) + } + status := resp.StatusCode + if status/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line) + } + + if err := resp.Body.Close(); err != nil { + level.Error(p.logger).Log("msg", "failed to close response body", "error", err) + } + + return status, err +} + +func AggregatedMetricEntry( + ts model.Time, + totalBytes, totalCount uint64, + service string, + lbls labels.Labels, +) string { + byteString := humanize.Bytes(totalBytes) + base := fmt.Sprintf( + "ts=%d bytes=%s count=%d %s=%s", + ts.UnixNano(), + byteString, + totalCount, + push.LabelServiceName, service, + ) + + for _, l := range lbls { + base += fmt.Sprintf(" %s=%s", l.Name, l.Value) + } + + return base +} diff --git a/pkg/pattern/aggregation/push_test.go b/pkg/pattern/aggregation/push_test.go new file mode 100644 index 000000000000..15f0336b5f7e --- /dev/null +++ b/pkg/pattern/aggregation/push_test.go @@ -0,0 +1,335 @@ +package aggregation + +import ( + "encoding/base64" + "fmt" + "math" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/util" +) + +const ( + testTenant = "test1" + testUsername = "user" + testPassword = "secret" + LogEntry = "%s %s\n" +) + +func Test_Push(t *testing.T) { + lbls := labels.New(labels.Label{Name: "test", Value: "test"}) + + // create dummy loki server + responses := make(chan response, 1) // buffered not to block the response handler + backoff := backoff.Config{ + MinBackoff: 300 * time.Millisecond, + MaxBackoff: 1 * time.Minute, + MaxRetries: 1, + } + + t.Run("sends log entry to loki server without TLS", func(t *testing.T) { + // mock loki server + mock := httptest.NewServer(createServerHandler(responses)) + require.NotNil(t, mock) + defer mock.Close() + + // without TLS + push, err := NewPush( + mock.Listener.Addr().String(), + "test1", + 2*time.Second, + 1*time.Second, + config.DefaultHTTPClientConfig, + "", "", + false, + &backoff, + log.NewNopLogger(), + ) + require.NoError(t, err) + ts, payload := testPayload() + push.WriteEntry(ts, payload, lbls) + resp := <-responses + assertResponse(t, resp, false, labelSet("test", "test"), ts, payload) + }) + + t.Run("sends log entry to loki server with basic auth", func(t *testing.T) { + // mock loki server + mock := httptest.NewServer(createServerHandler(responses)) + require.NotNil(t, mock) + defer mock.Close() + + // with basic Auth + push, err := NewPush( + mock.Listener.Addr().String(), + "test1", + 2*time.Second, + 1*time.Second, + config.DefaultHTTPClientConfig, + "user", "secret", + false, + &backoff, + log.NewNopLogger(), + ) + require.NoError(t, err) + ts, payload := testPayload() + push.WriteEntry(ts, payload, lbls) + resp := <-responses + assertResponse(t, resp, true, labelSet("test", "test"), ts, payload) + }) + + t.Run("batches push requests", func(t *testing.T) { + // mock loki server + mock := httptest.NewServer(createServerHandler(responses)) + require.NotNil(t, mock) + defer mock.Close() + + client, err := config.NewClientFromConfig( + config.DefaultHTTPClientConfig, + "pattern-ingester-push-test", + config.WithHTTP2Disabled(), + ) + require.NoError(t, err) + client.Timeout = 2 * time.Second + + u := url.URL{ + Scheme: "http", + Host: mock.Listener.Addr().String(), + Path: pushEndpoint, + } + + p := &Push{ + lokiURL: u.String(), + tenantID: "test1", + httpClient: client, + userAgent: defaultUserAgent, + contentType: defaultContentType, + username: "user", + password: "secret", + logger: log.NewNopLogger(), + quit: make(chan struct{}), + backoff: &backoff, + entries: entries{}, + } + + lbls1 := labels.New(labels.Label{Name: "test", Value: "test"}) + lbls2 := labels.New( + labels.Label{Name: "test", Value: "test"}, + labels.Label{Name: "test2", Value: "test2"}, + ) + + now := time.Now().Truncate(time.Second).UTC() + then := now.Add(-1 * time.Minute) + wayBack := now.Add(-5 * time.Minute) + + p.WriteEntry( + wayBack, + AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test_service", lbls1), + lbls1, + ) + p.WriteEntry( + then, + AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test_service", lbls1), + lbls1, + ) + p.WriteEntry( + now, + AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test_service", lbls1), + lbls1, + ) + + p.WriteEntry( + wayBack, + AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test2_service", lbls2), + lbls2, + ) + p.WriteEntry( + then, + AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test2_service", lbls2), + lbls2, + ) + p.WriteEntry( + now, + AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test2_service", lbls2), + lbls2, + ) + + go p.run(time.Nanosecond) + + select { + case resp := <-responses: + p.Stop() + req := resp.pushReq + assert.Len(t, req.Streams, 2) + + var stream1, stream2 logproto.Stream + for _, stream := range req.Streams { + if stream.Labels == lbls1.String() { + stream1 = stream + } + + if stream.Labels == lbls2.String() { + stream2 = stream + } + } + + require.Len(t, stream1.Entries, 3) + require.Len(t, stream2.Entries, 3) + + require.Equal(t, stream1.Entries[0].Timestamp, wayBack) + require.Equal(t, stream1.Entries[1].Timestamp, then) + require.Equal(t, stream1.Entries[2].Timestamp, now) + + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test_service", lbls1), + stream1.Entries[0].Line, + ) + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test_service", lbls1), + stream1.Entries[1].Line, + ) + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test_service", lbls1), + stream1.Entries[2].Line, + ) + + require.Equal(t, stream2.Entries[0].Timestamp, wayBack) + require.Equal(t, stream2.Entries[1].Timestamp, then) + require.Equal(t, stream2.Entries[2].Timestamp, now) + + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test2_service", lbls2), + stream2.Entries[0].Line, + ) + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test2_service", lbls2), + stream2.Entries[1].Line, + ) + require.Equal( + t, + AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test2_service", lbls2), + stream2.Entries[2].Line, + ) + + case <-time.After(5 * time.Second): + t.Fatal("timeout") + } + }) +} + +// Test helpers + +func assertResponse(t *testing.T, resp response, testAuth bool, labels labels.Labels, ts time.Time, payload string) { + t.Helper() + + // assert metadata + assert.Equal(t, testTenant, resp.tenantID) + + var expUser, expPass string + + if testAuth { + expUser = testUsername + expPass = testPassword + } + + assert.Equal(t, expUser, resp.username) + assert.Equal(t, expPass, resp.password) + assert.Equal(t, defaultContentType, resp.contentType) + assert.Equal(t, defaultUserAgent, resp.userAgent) + + // assert stream labels + require.Len(t, resp.pushReq.Streams, 1) + assert.Equal(t, labels.String(), resp.pushReq.Streams[0].Labels) + assert.Equal(t, labels.Hash(), resp.pushReq.Streams[0].Hash) + + // assert log entry + require.Len(t, resp.pushReq.Streams, 1) + require.Len(t, resp.pushReq.Streams[0].Entries, 1) + assert.Equal(t, payload, resp.pushReq.Streams[0].Entries[0].Line) + assert.Equal(t, ts, resp.pushReq.Streams[0].Entries[0].Timestamp) +} + +type response struct { + tenantID string + pushReq logproto.PushRequest + contentType string + userAgent string + username, password string +} + +func createServerHandler(responses chan response) http.HandlerFunc { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + // Parse the request + var pushReq logproto.PushRequest + if err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil { + rw.WriteHeader(500) + return + } + + var username, password string + + basicAuth := req.Header.Get("Authorization") + if basicAuth != "" { + encoded := strings.TrimPrefix(basicAuth, "Basic ") // now we have just encoded `username:password` + decoded, err := base64.StdEncoding.DecodeString(encoded) + if err != nil { + rw.WriteHeader(500) + return + } + toks := strings.FieldsFunc(string(decoded), func(r rune) bool { + return r == ':' + }) + username, password = toks[0], toks[1] + } + + responses <- response{ + tenantID: req.Header.Get("X-Scope-OrgID"), + contentType: req.Header.Get("Content-Type"), + userAgent: req.Header.Get("User-Agent"), + username: username, + password: password, + pushReq: pushReq, + } + + rw.WriteHeader(http.StatusOK) + }) +} + +func labelSet(keyVals ...string) labels.Labels { + if len(keyVals)%2 != 0 { + panic("not matching key-value pairs") + } + + lbls := labels.Labels{} + + for i := 0; i < len(keyVals)-1; i += 2 { + lbls = append(lbls, labels.Label{Name: keyVals[i], Value: keyVals[i+1]}) + } + + return lbls +} + +func testPayload() (time.Time, string) { + ts := time.Now().UTC() + payload := fmt.Sprintf(LogEntry, fmt.Sprint(ts.UnixNano()), "pppppp") + + return ts, payload +} diff --git a/pkg/pattern/flush_test.go b/pkg/pattern/flush_test.go index 9ee4bd436992..ea71f6055d8b 100644 --- a/pkg/pattern/flush_test.go +++ b/pkg/pattern/flush_test.go @@ -10,10 +10,14 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/kv" "github.com/grafana/dskit/ring" + ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" "github.com/grafana/dskit/user" "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/pattern/iter" @@ -22,7 +26,22 @@ import ( ) func TestSweepInstance(t *testing.T) { - ing, err := New(defaultIngesterTestConfig(t), "foo", nil, log.NewNopLogger()) + replicationSet := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Id: "localhost", Addr: "ingester0"}, + {Id: "remotehost", Addr: "ingester1"}, + {Id: "otherhost", Addr: "ingester2"}, + }, + } + + fakeRing := &fakeRing{} + fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(replicationSet, nil) + + ringClient := &fakeRingClient{ + ring: fakeRing, + } + + ing, err := New(defaultIngesterTestConfig(t), ringClient, "foo", nil, log.NewNopLogger()) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck err = services.StartAndAwaitRunning(context.Background(), ing) @@ -95,3 +114,185 @@ func defaultIngesterTestConfig(t testing.TB) Config { return cfg } + +type fakeRingClient struct { + ring ring.ReadRing + poolClient ring_client.PoolClient +} + +func (f *fakeRingClient) StartAsync(_ context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) AwaitRunning(_ context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) StopAsync() { + panic("not implemented") +} + +func (f *fakeRingClient) AwaitTerminated(_ context.Context) error { + panic("not implemented") +} + +func (f *fakeRingClient) FailureCase() error { + panic("not implemented") +} + +func (f *fakeRingClient) State() services.State { + panic("not implemented") +} + +func (f *fakeRingClient) AddListener(_ services.Listener) { + panic("not implemented") +} + +func (f *fakeRingClient) Ring() ring.ReadRing { + return f.ring +} + +func (f *fakeRingClient) GetClientFor(_ string) (ring_client.PoolClient, error) { + return f.poolClient, nil +} + +type fakeRing struct { + mock.Mock +} + +// InstancesWithTokensCount returns the number of instances in the ring that have tokens. +func (f *fakeRing) InstancesWithTokensCount() int { + args := f.Called() + return args.Int(0) +} + +// InstancesInZoneCount returns the number of instances in the ring that are registered in given zone. +func (f *fakeRing) InstancesInZoneCount(zone string) int { + args := f.Called(zone) + return args.Int(0) +} + +// InstancesWithTokensInZoneCount returns the number of instances in the ring that are registered in given zone and have tokens. +func (f *fakeRing) InstancesWithTokensInZoneCount(zone string) int { + args := f.Called(zone) + return args.Int(0) +} + +// ZonesCount returns the number of zones for which there's at least 1 instance registered in the ring. +func (f *fakeRing) ZonesCount() int { + args := f.Called() + return args.Int(0) +} + +func (f *fakeRing) Get( + key uint32, + op ring.Operation, + bufInstances []ring.InstanceDesc, + bufStrings1, bufStrings2 []string, +) (ring.ReplicationSet, error) { + args := f.Called(key, op, bufInstances, bufStrings1, bufStrings2) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (f *fakeRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) { + args := f.Called(op) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (f *fakeRing) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { + args := f.Called(op) + return args.Get(0).(ring.ReplicationSet), args.Error(1) +} + +func (f *fakeRing) ReplicationFactor() int { + args := f.Called() + return args.Int(0) +} + +func (f *fakeRing) InstancesCount() int { + args := f.Called() + return args.Int(0) +} + +func (f *fakeRing) ShuffleShard(identifier string, size int) ring.ReadRing { + args := f.Called(identifier, size) + return args.Get(0).(ring.ReadRing) +} + +func (f *fakeRing) GetInstanceState(instanceID string) (ring.InstanceState, error) { + args := f.Called(instanceID) + return args.Get(0).(ring.InstanceState), args.Error(1) +} + +func (f *fakeRing) ShuffleShardWithLookback( + identifier string, + size int, + lookbackPeriod time.Duration, + now time.Time, +) ring.ReadRing { + args := f.Called(identifier, size, lookbackPeriod, now) + return args.Get(0).(ring.ReadRing) +} + +func (f *fakeRing) HasInstance(instanceID string) bool { + args := f.Called(instanceID) + return args.Bool(0) +} + +func (f *fakeRing) CleanupShuffleShardCache(identifier string) { + f.Called(identifier) +} + +func (f *fakeRing) GetTokenRangesForInstance(identifier string) (ring.TokenRanges, error) { + args := f.Called(identifier) + return args.Get(0).(ring.TokenRanges), args.Error(1) +} + +type mockPoolClient struct { + mock.Mock + ctx context.Context + req *logproto.PushRequest +} + +func (m *mockPoolClient) Push( + ctx context.Context, + in *push.PushRequest, + _ ...grpc.CallOption, +) (*push.PushResponse, error) { + m.ctx = ctx + m.req = in + args := m.Called(ctx, in) + return args.Get(0).(*push.PushResponse), args.Error(1) +} + +func (m *mockPoolClient) Query( + ctx context.Context, + in *logproto.QueryPatternsRequest, + opts ...grpc.CallOption, +) (logproto.Pattern_QueryClient, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(logproto.Pattern_QueryClient), args.Error(1) +} + +func (m *mockPoolClient) Check( + ctx context.Context, + in *grpc_health_v1.HealthCheckRequest, + opts ...grpc.CallOption, +) (*grpc_health_v1.HealthCheckResponse, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(*grpc_health_v1.HealthCheckResponse), args.Error(1) +} + +func (m *mockPoolClient) Watch( + ctx context.Context, + in *grpc_health_v1.HealthCheckRequest, + opts ...grpc.CallOption, +) (grpc_health_v1.Health_WatchClient, error) { + args := m.Called(ctx, in, opts) + return args.Get(0).(grpc_health_v1.Health_WatchClient), args.Error(1) +} + +func (m *mockPoolClient) Close() error { + args := m.Called() + return args.Error(0) +} diff --git a/pkg/pattern/ingester.go b/pkg/pattern/ingester.go index 8864a03960bc..bd43908f289d 100644 --- a/pkg/pattern/ingester.go +++ b/pkg/pattern/ingester.go @@ -16,11 +16,13 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "google.golang.org/grpc/health/grpc_health_v1" ring_client "github.com/grafana/dskit/ring/client" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/pattern/aggregation" "github.com/grafana/loki/v3/pkg/pattern/clientpool" "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" @@ -38,6 +40,9 @@ type Config struct { FlushCheckPeriod time.Duration `yaml:"flush_check_period"` MaxClusters int `yaml:"max_clusters,omitempty" doc:"description=The maximum number of detected pattern clusters that can be created by streams."` MaxEvictionRatio float64 `yaml:"max_eviction_ratio,omitempty" doc:"description=The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will throttled pattern detection."` + MetricAggregation aggregation.Config `yaml:"metric_aggregation,omitempty" doc:"description=Configures the metric aggregation and storage behavior of the pattern ingester."` + TeeConfig TeeConfig `yaml:"tee_config,omitempty" doc:"description=Configures the pattern tee which forwards requests to the pattern ingester."` + ConnectionTimeout time.Duration `yaml:"connection_timeout"` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -47,11 +52,86 @@ type Config struct { func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlagsWithPrefix("pattern-ingester.", fs, util_log.Logger) cfg.ClientConfig.RegisterFlags(fs) - fs.BoolVar(&cfg.Enabled, "pattern-ingester.enabled", false, "Flag to enable or disable the usage of the pattern-ingester component.") - fs.IntVar(&cfg.ConcurrentFlushes, "pattern-ingester.concurrent-flushes", 32, "How many flushes can happen concurrently from each stream.") - fs.DurationVar(&cfg.FlushCheckPeriod, "pattern-ingester.flush-check-period", 1*time.Minute, "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.") - fs.IntVar(&cfg.MaxClusters, "pattern-ingester.max-clusters", drain.DefaultConfig().MaxClusters, "The maximum number of detected pattern clusters that can be created by the pattern ingester.") - fs.Float64Var(&cfg.MaxEvictionRatio, "pattern-ingester.max-eviction-ratio", drain.DefaultConfig().MaxEvictionRatio, "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.") + cfg.MetricAggregation.RegisterFlagsWithPrefix(fs, "pattern-ingester.") + cfg.TeeConfig.RegisterFlags(fs, "pattern-ingester.") + + fs.BoolVar( + &cfg.Enabled, + "pattern-ingester.enabled", + false, + "Flag to enable or disable the usage of the pattern-ingester component.", + ) + fs.IntVar( + &cfg.ConcurrentFlushes, + "pattern-ingester.concurrent-flushes", + 32, + "How many flushes can happen concurrently from each stream.", + ) + fs.DurationVar( + &cfg.FlushCheckPeriod, + "pattern-ingester.flush-check-period", + 1*time.Minute, + "How often should the ingester see if there are any blocks to flush. The first flush check is delayed by a random time up to 0.8x the flush check period. Additionally, there is +/- 1% jitter added to the interval.", + ) + fs.IntVar( + &cfg.MaxClusters, + "pattern-ingester.max-clusters", + drain.DefaultConfig().MaxClusters, + "The maximum number of detected pattern clusters that can be created by the pattern ingester.", + ) + fs.Float64Var( + &cfg.MaxEvictionRatio, + "pattern-ingester.max-eviction-ratio", + drain.DefaultConfig().MaxEvictionRatio, + "The maximum eviction ratio of patterns per stream. Once that ratio is reached, the stream will be throttled for pattern detection.", + ) + fs.DurationVar( + &cfg.ConnectionTimeout, + "pattern-ingester.connection-timeout", + 2*time.Second, + "Timeout for connections between the Loki and the pattern ingester.", + ) +} + +type TeeConfig struct { + BatchSize int `yaml:"batch_size"` + BatchFlushInterval time.Duration `yaml:"batch_flush_interval"` + FlushQueueSize int `yaml:"flush_queue_size"` + FlushWorkerCount int `yaml:"flush_worker_count"` + StopFlushTimeout time.Duration `yaml:"stop_flush_timeout"` +} + +func (cfg *TeeConfig) RegisterFlags(f *flag.FlagSet, prefix string) { + f.IntVar( + &cfg.BatchSize, + prefix+"tee.batch-size", + 5000, + "The size of the batch of raw logs to send for template mining", + ) + f.DurationVar( + &cfg.BatchFlushInterval, + prefix+"tee.batch-flush-interval", + time.Second, + "The max time between batches of raw logs to send for template mining", + ) + f.IntVar( + &cfg.FlushQueueSize, + prefix+"tee.flush-queue-size", + 1000, + "The number of log flushes to queue before dropping", + ) + f.IntVar( + &cfg.FlushWorkerCount, + prefix+"tee.flush-worker-count", + 100, + "the number of concurrent workers sending logs to the template service", + ) + f.DurationVar( + &cfg.StopFlushTimeout, + prefix+"tee.stop-flush-timeout", + 30*time.Second, + "The max time we will try to flush any remaining logs to be mined when the service is stopped", + ) } func (cfg *Config) Validate() error { @@ -64,6 +144,7 @@ func (cfg *Config) Validate() error { type Ingester struct { services.Service lifecycler *ring.Lifecycler + ringClient RingClient lifecyclerWatcher *services.FailureWatcher @@ -87,6 +168,7 @@ type Ingester struct { func New( cfg Config, + ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, @@ -100,6 +182,7 @@ func New( i := &Ingester{ cfg: cfg, + ringClient: ringClient, logger: log.With(logger, "component", "pattern-ingester"), registerer: registerer, metrics: metrics, @@ -165,6 +248,7 @@ func (i *Ingester) stopping(_ error) error { flushQueue.Close() } i.flushQueuesDone.Wait() + i.stopWriters() return err } @@ -196,13 +280,29 @@ func (i *Ingester) loop() { flushTicker := util.NewTickerWithJitter(i.cfg.FlushCheckPeriod, j) defer flushTicker.Stop() - for { - select { - case <-flushTicker.C: - i.sweepUsers(false, true) - - case <-i.loopQuit: - return + if i.cfg.MetricAggregation.Enabled { + downsampleTicker := time.NewTimer(i.cfg.MetricAggregation.DownsamplePeriod) + defer downsampleTicker.Stop() + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + case t := <-downsampleTicker.C: + downsampleTicker.Reset(i.cfg.MetricAggregation.DownsamplePeriod) + now := model.TimeFromUnixNano(t.UnixNano()) + i.downsampleMetrics(now) + case <-i.loopQuit: + return + } + } + } else { + for { + select { + case <-flushTicker.C: + i.sweepUsers(false, true) + case <-i.loopQuit: + return + } } } } @@ -284,11 +384,34 @@ func (i *Ingester) GetOrCreateInstance(instanceID string) (*instance, error) { / inst, ok = i.instances[instanceID] if !ok { var err error + var writer aggregation.EntryWriter + + aggCfg := i.cfg.MetricAggregation + if aggCfg.Enabled { + writer, err = aggregation.NewPush( + aggCfg.LokiAddr, + instanceID, + aggCfg.WriteTimeout, + aggCfg.PushPeriod, + aggCfg.HTTPClientConfig, + aggCfg.BasicAuth.Username, + string(aggCfg.BasicAuth.Password), + aggCfg.UseTLS, + &aggCfg.BackoffConfig, + i.logger, + ) + if err != nil { + return nil, err + } + } inst, err = newInstance( instanceID, i.logger, i.metrics, i.drainCfg, + i.ringClient, + i.lifecycler.ID, + writer, ) if err != nil { return nil, err @@ -316,3 +439,21 @@ func (i *Ingester) getInstances() []*instance { } return instances } + +func (i *Ingester) stopWriters() { + instances := i.getInstances() + + for _, instance := range instances { + if instance.writer != nil { + instance.writer.Stop() + } + } +} + +func (i *Ingester) downsampleMetrics(ts model.Time) { + instances := i.getInstances() + + for _, instance := range instances { + instance.Downsample(ts) + } +} diff --git a/pkg/pattern/ingester_querier.go b/pkg/pattern/ingester_querier.go index 2220a2ef41d8..a77dd47b3113 100644 --- a/pkg/pattern/ingester_querier.go +++ b/pkg/pattern/ingester_querier.go @@ -27,7 +27,7 @@ type IngesterQuerier struct { cfg Config logger log.Logger - ringClient *RingClient + ringClient RingClient registerer prometheus.Registerer ingesterQuerierMetrics *ingesterQuerierMetrics @@ -35,7 +35,7 @@ type IngesterQuerier struct { func NewIngesterQuerier( cfg Config, - ringClient *RingClient, + ringClient RingClient, metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, @@ -128,7 +128,7 @@ func prunePatterns(resp *logproto.QueryPatternsResponse, minClusterSize int64, m // ForAllIngesters runs f, in parallel, for all ingesters func (q *IngesterQuerier) forAllIngesters(ctx context.Context, f func(context.Context, logproto.PatternClient) (interface{}, error)) ([]ResponseFromIngesters, error) { - replicationSet, err := q.ringClient.ring.GetAllHealthy(ring.Read) + replicationSet, err := q.ringClient.Ring().GetAllHealthy(ring.Read) if err != nil { return nil, err } @@ -149,7 +149,7 @@ func (q *IngesterQuerier) forGivenIngesters(ctx context.Context, replicationSet ingester := ingester i := i g.Go(func() error { - client, err := q.ringClient.pool.GetClientFor(ingester.Addr) + client, err := q.ringClient.GetClientFor(ingester.Addr) if err != nil { return err } diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index 90b1845a90c3..a5dd5cdbaaed 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -7,24 +7,56 @@ import ( "time" "github.com/go-kit/log" + "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/grafana/dskit/ring" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/pattern/aggregation" "github.com/grafana/loki/v3/pkg/pattern/iter" + "github.com/grafana/loki/v3/pkg/util/constants" "github.com/grafana/loki/v3/pkg/pattern/drain" + loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" + "github.com/grafana/loki/pkg/push" ) func TestInstancePushQuery(t *testing.T) { lbs := labels.New(labels.Label{Name: "test", Value: "test"}) + + ingesterID := "foo" + replicationSet := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Id: ingesterID, Addr: "ingester0"}, + {Id: "bar", Addr: "ingester1"}, + {Id: "baz", Addr: "ingester2"}, + }, + } + + fakeRing := &fakeRing{} + fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(replicationSet, nil) + + ringClient := &fakeRingClient{ + ring: fakeRing, + } + + mockWriter := &mockEntryWriter{} + mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything) + inst, err := newInstance( "foo", log.NewNopLogger(), newIngesterMetrics(nil, "test"), drain.DefaultConfig(), + ringClient, + ingesterID, + mockWriter, ) require.NoError(t, err) @@ -68,3 +100,239 @@ func TestInstancePushQuery(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, len(res.Series)) } + +func TestInstancePushAggregateMetrics(t *testing.T) { + lbs := labels.New( + labels.Label{Name: "test", Value: "test"}, + labels.Label{Name: "service_name", Value: "test_service"}, + ) + lbs2 := labels.New( + labels.Label{Name: "foo", Value: "bar"}, + labels.Label{Name: "service_name", Value: "foo_service"}, + ) + lbs3 := labels.New( + labels.Label{Name: "foo", Value: "baz"}, + labels.Label{Name: "service_name", Value: "baz_service"}, + ) + + setup := func() (*instance, *mockEntryWriter) { + ingesterID := "foo" + replicationSet := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Id: ingesterID, Addr: "ingester0"}, + {Id: "bar", Addr: "ingester1"}, + {Id: "baz", Addr: "ingester2"}, + }, + } + + fakeRing := &fakeRing{} + fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(replicationSet, nil) + + ringClient := &fakeRingClient{ + ring: fakeRing, + } + + mockWriter := &mockEntryWriter{} + mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything) + + inst, err := newInstance( + "foo", + log.NewNopLogger(), + newIngesterMetrics(nil, "test"), + drain.DefaultConfig(), + ringClient, + ingesterID, + mockWriter, + ) + require.NoError(t, err) + + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbs.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(20, 0), + Line: "ts=1 msg=hello", + StructuredMetadata: push.LabelsAdapter{ + push.LabelAdapter{ + Name: constants.LevelLabel, + Value: "info", + }, + }, + }, + }, + }, + { + Labels: lbs2.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(20, 0), + Line: "ts=1 msg=hello", + StructuredMetadata: push.LabelsAdapter{ + push.LabelAdapter{ + Name: constants.LevelLabel, + Value: "error", + }, + }, + }, + }, + }, + { + Labels: lbs3.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(20, 0), + Line: "error error error", + StructuredMetadata: push.LabelsAdapter{ + push.LabelAdapter{ + Name: constants.LevelLabel, + Value: "error", + }, + }, + }, + }, + }, + }, + }) + for i := 0; i < 30; i++ { + err = inst.Push(context.Background(), &push.PushRequest{ + Streams: []push.Stream{ + { + Labels: lbs.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(20, 0), + Line: "foo bar foo bar", + StructuredMetadata: push.LabelsAdapter{ + push.LabelAdapter{ + Name: constants.LevelLabel, + Value: "info", + }, + }, + }, + }, + }, + { + Labels: lbs2.String(), + Entries: []push.Entry{ + { + Timestamp: time.Unix(20, 0), + Line: "foo bar foo bar", + StructuredMetadata: push.LabelsAdapter{ + push.LabelAdapter{ + Name: constants.LevelLabel, + Value: "error", + }, + }, + }, + }, + }, + }, + }) + require.NoError(t, err) + } + require.NoError(t, err) + + return inst, mockWriter + } + + t.Run("accumulates bytes and count for each stream and level on every push", func(t *testing.T) { + inst, _ := setup() + + require.Len(t, inst.aggMetricsByStreamAndLevel, 3) + + require.Equal(t, uint64(14+(15*30)), inst.aggMetricsByStreamAndLevel[lbs.String()]["info"].bytes) + require.Equal(t, uint64(14+(15*30)), inst.aggMetricsByStreamAndLevel[lbs2.String()]["error"].bytes) + require.Equal(t, uint64(17), inst.aggMetricsByStreamAndLevel[lbs3.String()]["error"].bytes) + + require.Equal( + t, + uint64(31), + inst.aggMetricsByStreamAndLevel[lbs.String()]["info"].count, + ) + require.Equal( + t, + uint64(31), + inst.aggMetricsByStreamAndLevel[lbs2.String()]["error"].count, + ) + require.Equal( + t, + uint64(1), + inst.aggMetricsByStreamAndLevel[lbs3.String()]["error"].count, + ) + }, + ) + + t.Run("downsamples aggregated metrics", func(t *testing.T) { + inst, mockWriter := setup() + now := model.Now() + inst.Downsample(now) + + mockWriter.AssertCalled( + t, + "WriteEntry", + now.Time(), + aggregation.AggregatedMetricEntry( + now, + uint64(14+(15*30)), + uint64(31), + "test_service", + lbs, + ), + labels.New( + labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "test_service"}, + labels.Label{Name: "level", Value: "info"}, + ), + ) + + mockWriter.AssertCalled( + t, + "WriteEntry", + now.Time(), + aggregation.AggregatedMetricEntry( + now, + uint64(14+(15*30)), + uint64(31), + "foo_service", + lbs2, + ), + labels.New( + labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "foo_service"}, + labels.Label{Name: "level", Value: "error"}, + ), + ) + + mockWriter.AssertCalled( + t, + "WriteEntry", + now.Time(), + aggregation.AggregatedMetricEntry( + now, + uint64(17), + uint64(1), + "baz_service", + lbs3, + ), + labels.New( + labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "baz_service"}, + labels.Label{Name: "level", Value: "error"}, + ), + ) + + require.Equal(t, 0, len(inst.aggMetricsByStreamAndLevel)) + }) +} + +type mockEntryWriter struct { + mock.Mock +} + +func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Labels) { + _ = m.Called(ts, entry, lbls) +} + +func (m *mockEntryWriter) Stop() { + _ = m.Called() +} diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index e19ba040ff71..719f90d69075 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -2,22 +2,31 @@ package pattern import ( "context" + "errors" "fmt" "net/http" + "strings" + "sync" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/grafana/dskit/multierror" + "github.com/grafana/dskit/ring" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/index" + "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/pattern/aggregation" "github.com/grafana/loki/v3/pkg/pattern/drain" "github.com/grafana/loki/v3/pkg/pattern/iter" "github.com/grafana/loki/v3/pkg/util" + "github.com/grafana/loki/v3/pkg/util/constants" + lokiring "github.com/grafana/loki/v3/pkg/util/ring" ) const indexShards = 32 @@ -32,21 +41,45 @@ type instance struct { logger log.Logger metrics *ingesterMetrics drainCfg *drain.Config + ringClient RingClient + ingesterID string + + aggMetricsLock sync.Mutex + aggMetricsByStreamAndLevel map[string]map[string]*aggregatedMetrics + + writer aggregation.EntryWriter +} + +type aggregatedMetrics struct { + bytes uint64 + count uint64 } -func newInstance(instanceID string, logger log.Logger, metrics *ingesterMetrics, drainCfg *drain.Config) (*instance, error) { +func newInstance( + instanceID string, + logger log.Logger, + metrics *ingesterMetrics, + drainCfg *drain.Config, + ringClient RingClient, + ingesterID string, + writer aggregation.EntryWriter, +) (*instance, error) { index, err := index.NewBitPrefixWithShards(indexShards) if err != nil { return nil, err } i := &instance{ - buf: make([]byte, 0, 1024), - logger: logger, - instanceID: instanceID, - streams: newStreamsMap(), - index: index, - metrics: metrics, - drainCfg: drainCfg, + buf: make([]byte, 0, 1024), + logger: logger, + instanceID: instanceID, + streams: newStreamsMap(), + index: index, + metrics: metrics, + drainCfg: drainCfg, + ringClient: ringClient, + ingesterID: ingesterID, + aggMetricsByStreamAndLevel: make(map[string]map[string]*aggregatedMetrics), + writer: writer, } i.mapper = ingester.NewFPMapper(i.getLabelsFromFingerprint) return i, nil @@ -58,27 +91,69 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { appendErr := multierror.New() for _, reqStream := range req.Streams { - if reqStream.Entries == nil || len(reqStream.Entries) == 0 { - continue - } - s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, - func() (*stream, error) { - // add stream - return i.createStream(ctx, reqStream) - }, nil) + // All streams are observed for metrics + // TODO(twhitney): this would be better as a queue that drops in response to backpressure + i.Observe(reqStream.Labels, reqStream.Entries) + + // But only owned streamed are processed for patterns + ownedStream, err := i.isOwnedStream(i.ingesterID, reqStream.Labels) if err != nil { appendErr.Add(err) - continue } - err = s.Push(ctx, reqStream.Entries) - if err != nil { - appendErr.Add(err) - continue + + if ownedStream { + if reqStream.Entries == nil || len(reqStream.Entries) == 0 { + continue + } + s, _, err := i.streams.LoadOrStoreNew(reqStream.Labels, + func() (*stream, error) { + // add stream + return i.createStream(ctx, reqStream) + }, nil) + if err != nil { + appendErr.Add(err) + continue + } + err = s.Push(ctx, reqStream.Entries) + if err != nil { + appendErr.Add(err) + continue + } } } + return appendErr.Err() } +func (i *instance) isOwnedStream(ingesterID string, stream string) (bool, error) { + var descs [1]ring.InstanceDesc + replicationSet, err := i.ringClient.Ring().Get( + lokiring.TokenFor(i.instanceID, stream), + ring.WriteNoExtend, + descs[:0], + nil, + nil, + ) + if err != nil { + return false, fmt.Errorf( + "error getting replication set for stream %s: %v", + stream, + err, + ) + } + + if replicationSet.Instances == nil { + return false, errors.New("no instances found") + } + + for _, instanceDesc := range replicationSet.Instances { + if instanceDesc.Id == ingesterID { + return true, nil + } + } + return false, nil +} + // Iterator returns an iterator of pattern samples matching the given query patterns request. func (i *instance) Iterator(ctx context.Context, req *logproto.QueryPatternsRequest) (iter.Iterator, error) { matchers, err := syntax.ParseMatchers(req.Query, true) @@ -174,3 +249,89 @@ func (i *instance) removeStream(s *stream) { i.index.Delete(s.labels, s.fp) } } + +func (i *instance) Observe(stream string, entries []logproto.Entry) { + i.aggMetricsLock.Lock() + defer i.aggMetricsLock.Unlock() + + for _, entry := range entries { + lvl := constants.LogLevelUnknown + structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata) + if structuredMetadata.Has(constants.LevelLabel) { + lvl = strings.ToLower(structuredMetadata.Get(constants.LevelLabel)) + } + + streamMetrics, ok := i.aggMetricsByStreamAndLevel[stream] + + if !ok { + streamMetrics = make(map[string]*aggregatedMetrics, len(constants.LogLevels)) + for _, l := range constants.LogLevels { + streamMetrics[l] = &aggregatedMetrics{} + } + } + + if _, ok := streamMetrics[lvl]; !ok { + level.Warn(i.logger).Log( + "msg", "unknown log level while observing stream", + "level", lvl, + "stream", stream, + ) + + lvl = constants.LogLevelUnknown + } + + streamMetrics[lvl].bytes += uint64(len(entry.Line)) + streamMetrics[lvl].count++ + + i.aggMetricsByStreamAndLevel[stream] = streamMetrics + } +} + +func (i *instance) Downsample(now model.Time) { + i.aggMetricsLock.Lock() + defer func() { + i.aggMetricsByStreamAndLevel = make(map[string]map[string]*aggregatedMetrics) + i.aggMetricsLock.Unlock() + }() + + for stream, metricsByLevel := range i.aggMetricsByStreamAndLevel { + lbls, err := syntax.ParseLabels(stream) + if err != nil { + continue + } + + for level, metrics := range metricsByLevel { + // we start with an empty bucket for each level, so only write if we have metrics + if metrics.count > 0 { + i.writeAggregatedMetrics(now, lbls, level, metrics.bytes, metrics.count) + } + } + } +} + +func (i *instance) writeAggregatedMetrics( + now model.Time, + streamLbls labels.Labels, + level string, + totalBytes, totalCount uint64, +) { + service := streamLbls.Get(push.LabelServiceName) + if service == "" { + service = push.ServiceUnknown + } + + newLbls := labels.Labels{ + labels.Label{Name: push.AggregatedMetricLabel, Value: service}, + labels.Label{Name: "level", Value: level}, + } + + if i.writer != nil { + i.writer.WriteEntry( + now.Time(), + aggregation.AggregatedMetricEntry(now, totalBytes, totalCount, service, streamLbls), + newLbls, + ) + + i.metrics.samples.WithLabelValues(service).Inc() + } +} diff --git a/pkg/pattern/metrics.go b/pkg/pattern/metrics.go index f6f8289c7d17..25bbd1fd1f6e 100644 --- a/pkg/pattern/metrics.go +++ b/pkg/pattern/metrics.go @@ -11,6 +11,7 @@ type ingesterMetrics struct { patternsDetectedTotal *prometheus.CounterVec tokensPerLine *prometheus.HistogramVec statePerLine *prometheus.HistogramVec + samples *prometheus.CounterVec } func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *ingesterMetrics { @@ -47,6 +48,12 @@ func newIngesterMetrics(r prometheus.Registerer, metricsNamespace string) *inges Help: "The number of items of additional state returned alongside tokens for pattern recognition.", Buckets: []float64{20, 40, 80, 120, 160, 320, 640, 1280}, }, []string{"tenant", "format"}), + samples: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: metricsNamespace, + Subsystem: "pattern_ingester", + Name: "metric_samples", + Help: "The total number of samples created to write back to Loki.", + }, []string{"service_name"}), } } diff --git a/pkg/pattern/ring_client.go b/pkg/pattern/ring_client.go index 3ceaf481a3b9..72739e0c0849 100644 --- a/pkg/pattern/ring_client.go +++ b/pkg/pattern/ring_client.go @@ -13,7 +13,13 @@ import ( "github.com/grafana/loki/v3/pkg/pattern/clientpool" ) -type RingClient struct { +type RingClient interface { + services.Service + Ring() ring.ReadRing + GetClientFor(addr string) (ring_client.PoolClient, error) +} + +type ringClient struct { cfg Config logger log.Logger @@ -29,10 +35,10 @@ func NewRingClient( metricsNamespace string, registerer prometheus.Registerer, logger log.Logger, -) (*RingClient, error) { +) (RingClient, error) { var err error registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) - ringClient := &RingClient{ + ringClient := &ringClient{ logger: log.With(logger, "component", "pattern-ring-client"), cfg: cfg, } @@ -59,19 +65,55 @@ func NewRingClient( return ringClient, nil } -func (q *RingClient) starting(ctx context.Context) error { - return services.StartManagerAndAwaitHealthy(ctx, q.subservices) +func (r *ringClient) starting(ctx context.Context) error { + return services.StartManagerAndAwaitHealthy(ctx, r.subservices) } -func (q *RingClient) running(ctx context.Context) error { +func (r *ringClient) running(ctx context.Context) error { select { case <-ctx.Done(): return nil - case err := <-q.subservicesWatcher.Chan(): + case err := <-r.subservicesWatcher.Chan(): return fmt.Errorf("pattern tee subservices failed: %w", err) } } -func (q *RingClient) stopping(_ error) error { - return services.StopManagerAndAwaitStopped(context.Background(), q.subservices) +func (r *ringClient) stopping(_ error) error { + return services.StopManagerAndAwaitStopped(context.Background(), r.subservices) +} + +func (r *ringClient) Ring() ring.ReadRing { + return r.ring +} + +func (r *ringClient) StartAsync(ctx context.Context) error { + return r.ring.StartAsync(ctx) +} + +func (r *ringClient) AwaitRunning(ctx context.Context) error { + return r.ring.AwaitRunning(ctx) +} + +func (r *ringClient) StopAsync() { + r.ring.StopAsync() +} + +func (r *ringClient) AwaitTerminated(ctx context.Context) error { + return r.ring.AwaitTerminated(ctx) +} + +func (r *ringClient) FailureCase() error { + return r.ring.FailureCase() +} + +func (r *ringClient) State() services.State { + return r.ring.State() +} + +func (r *ringClient) AddListener(listener services.Listener) { + r.ring.AddListener(listener) +} + +func (r *ringClient) GetClientFor(addr string) (ring_client.PoolClient, error) { + return r.pool.GetClientFor(addr) } diff --git a/pkg/pattern/tee.go b/pkg/pattern/tee.go deleted file mode 100644 index 70fb37e1b692..000000000000 --- a/pkg/pattern/tee.go +++ /dev/null @@ -1,88 +0,0 @@ -package pattern - -import ( - "context" - "errors" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/grafana/dskit/ring" - "github.com/grafana/dskit/user" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - - "github.com/grafana/loki/v3/pkg/distributor" - "github.com/grafana/loki/v3/pkg/logproto" -) - -type Tee struct { - cfg Config - logger log.Logger - ringClient *RingClient - - ingesterAppends *prometheus.CounterVec -} - -func NewTee( - cfg Config, - ringClient *RingClient, - metricsNamespace string, - registerer prometheus.Registerer, - logger log.Logger, -) (*Tee, error) { - registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) - - t := &Tee{ - logger: log.With(logger, "component", "pattern-tee"), - ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ - Name: "pattern_ingester_appends_total", - Help: "The total number of batch appends sent to pattern ingesters.", - }, []string{"ingester", "status"}), - cfg: cfg, - ringClient: ringClient, - } - - return t, nil -} - -// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters. -func (t *Tee) Duplicate(tenant string, streams []distributor.KeyedStream) { - for idx := range streams { - go func(stream distributor.KeyedStream) { - if err := t.sendStream(tenant, stream); err != nil { - level.Error(t.logger).Log("msg", "failed to send stream to pattern ingester", "err", err) - } - }(streams[idx]) - } -} - -func (t *Tee) sendStream(tenant string, stream distributor.KeyedStream) error { - var descs [1]ring.InstanceDesc - replicationSet, err := t.ringClient.ring.Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) - if err != nil { - return err - } - if replicationSet.Instances == nil { - return errors.New("no instances found") - } - addr := replicationSet.Instances[0].Addr - client, err := t.ringClient.pool.GetClientFor(addr) - if err != nil { - return err - } - req := &logproto.PushRequest{ - Streams: []logproto.Stream{ - stream.Stream, - }, - } - - ctx, cancel := context.WithTimeout(user.InjectOrgID(context.Background(), tenant), t.cfg.ClientConfig.RemoteTimeout) - defer cancel() - _, err = client.(logproto.PatternClient).Push(ctx, req) - if err != nil { - t.ingesterAppends.WithLabelValues(addr, "fail").Inc() - return err - } - t.ingesterAppends.WithLabelValues(addr, "success").Inc() - return nil -} diff --git a/pkg/pattern/tee_service.go b/pkg/pattern/tee_service.go new file mode 100644 index 000000000000..13058fbaeb46 --- /dev/null +++ b/pkg/pattern/tee_service.go @@ -0,0 +1,401 @@ +package pattern + +import ( + "context" + "errors" + "strings" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/util/pool" + + "github.com/grafana/dskit/instrument" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/user" + + "github.com/grafana/loki/v3/pkg/distributor" + "github.com/grafana/loki/v3/pkg/loghttp/push" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/util/constants" + + ring_client "github.com/grafana/dskit/ring/client" +) + +type TeeService struct { + cfg Config + logger log.Logger + ringClient RingClient + wg *sync.WaitGroup + + ingesterAppends *prometheus.CounterVec + ingesterMetricAppends *prometheus.CounterVec + + teedStreams *prometheus.CounterVec + teedRequests *prometheus.CounterVec + + sendDuration *instrument.HistogramCollector + + flushQueue chan clientRequest + + bufferPool *pool.Pool + buffersMutex *sync.Mutex + buffers map[string][]distributor.KeyedStream +} + +func NewTeeService( + cfg Config, + ringClient RingClient, + metricsNamespace string, + registerer prometheus.Registerer, + logger log.Logger, +) (*TeeService, error) { + registerer = prometheus.WrapRegistererWithPrefix(metricsNamespace+"_", registerer) + + t := &TeeService{ + logger: log.With(logger, "component", "pattern-tee"), + ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "pattern_ingester_appends_total", + Help: "The total number of batch appends sent to pattern ingesters.", + }, []string{"ingester", "status"}), + ingesterMetricAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "pattern_ingester_metric_appends_total", + Help: "The total number of metric only batch appends sent to pattern ingesters. These requests will not be processed for patterns.", + }, []string{"status"}), + teedStreams: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "pattern_ingester_teed_streams_total", + Help: "The total number of streams teed to the pattern ingester.", + }, []string{"status"}), + teedRequests: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Name: "pattern_ingester_teed_requests_total", + Help: "The total number of batch appends sent to fallback pattern ingesters, for not owned streams.", + }, []string{"status"}), + sendDuration: instrument.NewHistogramCollector( + promauto.With(registerer).NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: constants.Loki, + Name: "pattern_ingester_tee_send_duration_seconds", + Help: "Time spent sending batches from the tee to the pattern ingester", + Buckets: prometheus.DefBuckets, + }, instrument.HistogramCollectorBuckets, + ), + ), + cfg: cfg, + ringClient: ringClient, + + wg: &sync.WaitGroup{}, + buffersMutex: &sync.Mutex{}, + buffers: make(map[string][]distributor.KeyedStream), + flushQueue: make(chan clientRequest, cfg.TeeConfig.FlushQueueSize), + } + + return t, nil +} + +func (ts *TeeService) Start(runCtx context.Context) error { + ts.wg.Add(1) + + // Start all batchSenders. We don't use the Run() context here, because we + // want the senders to finish sending any currently in-flight data and the + // remining batches in the queue before the TeeService fully stops. + // + // Still, we have a maximum amount of time we will wait after the TeeService + // is stopped, see cfg.StopFlushTimeout below. + senderCtx, senderCancel := context.WithCancel(context.Background()) + + sendersWg := &sync.WaitGroup{} + sendersWg.Add(ts.cfg.TeeConfig.FlushWorkerCount) + for i := 0; i < ts.cfg.TeeConfig.FlushWorkerCount; i++ { + go func() { + ts.batchSender(senderCtx) + sendersWg.Done() + }() + } + + // We need this to implement the select with StopFlushTimeout below + sendersDone := make(chan struct{}) + go func() { + sendersWg.Wait() + close(sendersDone) + }() + + go func() { + // We wait for the Run() context to be done, so we know we are stopping + <-runCtx.Done() + + // The senders euther stop normally in the allotted time, or we hit the + // timeout and cancel thir context. In either case, we wait for them to + // finish before we consider the service to be done. + select { + case <-time.After(ts.cfg.TeeConfig.StopFlushTimeout): + senderCancel() // Cancel any remaining senders + <-sendersDone // Wait for them to be done + case <-sendersDone: + } + ts.wg.Done() + }() + + go func() { + t := time.NewTicker(ts.cfg.TeeConfig.BatchFlushInterval) + defer t.Stop() + for { + select { + case <-t.C: + ts.flush() + case <-runCtx.Done(): + // Final flush to send anything currently buffered + ts.flush() + close(ts.flushQueue) // nothing will write to it anymore + return + } + } + }() + + return nil +} + +func (ts *TeeService) WaitUntilDone() { + ts.wg.Wait() +} + +func (ts *TeeService) flush() { + ts.buffersMutex.Lock() + if len(ts.buffers) == 0 { + ts.buffersMutex.Unlock() + return + } + + buffered := ts.buffers + ts.buffers = make(map[string][]distributor.KeyedStream) + ts.buffersMutex.Unlock() + + batches := make([]map[string]map[string]*logproto.PushRequest, 0, len(buffered)) + for tenant, streams := range buffered { + batches = append(batches, ts.batchesForTenant(tenant, streams)) + } + + byTenantAndPatternIngester := make(map[string]map[string][]*logproto.PushRequest) + for _, b := range batches { + for tenant, requests := range b { + for addr, req := range requests { + byTenant, ok := byTenantAndPatternIngester[tenant] + if !ok { + byTenant = make(map[string][]*logproto.PushRequest) + } + + byTenant[addr] = append( + byTenant[addr], + req, + ) + + byTenantAndPatternIngester[tenant] = byTenant + } + } + } + + for tenant, requests := range byTenantAndPatternIngester { + for addr, reqs := range requests { + select { + case ts.flushQueue <- clientRequest{ + ingesterAddr: addr, + tenant: tenant, + reqs: reqs, + }: + ts.teedRequests.WithLabelValues("queued").Inc() + default: + ts.teedRequests.WithLabelValues("dropped").Inc() + } + } + } +} + +func (ts *TeeService) batchesForTenant( + tenant string, + streams []distributor.KeyedStream, +) map[string]map[string]*logproto.PushRequest { + batches := map[string]map[string]*logproto.PushRequest{ + tenant: make(map[string]*logproto.PushRequest), + } + + if len(streams) == 0 { + return batches + } + + for _, stream := range streams { + var descs [1]ring.InstanceDesc + replicationSet, err := ts.ringClient.Ring(). + Get(stream.HashKey, ring.WriteNoExtend, descs[:0], nil, nil) + if err != nil || len(replicationSet.Instances) == 0 { + ts.teedStreams.WithLabelValues("dropped").Inc() + continue + } + + addr := replicationSet.Instances[0].Addr + batch, ok := batches[tenant][addr] + if !ok { + batch = &logproto.PushRequest{} + batches[tenant][addr] = batch + } + + if len(stream.Stream.Entries) > 0 { + batch.Streams = append(batch.Streams, stream.Stream) + ts.teedStreams.WithLabelValues("batched").Inc() + } + } + + streamCount := uint64(len(streams)) + level.Debug(ts.logger).Log( + "msg", "prepared pattern Tee batches for tenant", + "tenant", tenant, + "stream_count", streamCount, + ) + + return batches +} + +type clientRequest struct { + ingesterAddr string + tenant string + reqs []*logproto.PushRequest +} + +func (ts *TeeService) batchSender(ctx context.Context) { + for { + select { + case clientReq, ok := <-ts.flushQueue: + if !ok { + return // we are done, the queue was closed by Run() + } + ts.sendBatch(ctx, clientReq) + case <-ctx.Done(): + return + } + } +} + +func (ts *TeeService) sendBatch(ctx context.Context, clientRequest clientRequest) { + ctx, cancel := context.WithTimeout(ctx, ts.cfg.ConnectionTimeout) + defer cancel() + + for i := 0; i < len(clientRequest.reqs); i++ { + req := clientRequest.reqs[i] + + if len(req.Streams) == 0 { + continue + } + + // Nothing to do with this error. It's recorded in the metrics that + // are gathered by this request + _ = instrument.CollectedRequest( + ctx, + "FlushTeedLogsToPatternIngested", + ts.sendDuration, + instrument.ErrorCode, + func(ctx context.Context) error { + client, err := ts.ringClient.GetClientFor(clientRequest.ingesterAddr) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout( + user.InjectOrgID(ctx, clientRequest.tenant), + ts.cfg.ClientConfig.RemoteTimeout, + ) + + // First try to send the request to the correct pattern ingester + defer cancel() + _, err = client.(logproto.PatternClient).Push(ctx, req) + if err == nil { + // Success here means the stream will be processed for both metrics and patterns + ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "success").Inc() + ts.ingesterMetricAppends.WithLabelValues("success").Inc() + return nil + } + + // The pattern ingester appends failed, but we can retry the metric append + ts.ingesterAppends.WithLabelValues(clientRequest.ingesterAddr, "fail").Inc() + level.Error(ts.logger).Log("msg", "failed to send patterns to pattern ingester", "err", err) + + if !ts.cfg.MetricAggregation.Enabled { + return err + } + + // Pattern ingesters serve 2 functions, processing patterns and aggregating metrics. + // Only owned streams are processed for patterns, however any pattern ingester can + // aggregate metrics for any stream. Therefore, if we can't send the owned stream, + // try to forward request to any pattern ingester so we at least capture the metrics. + replicationSet, err := ts.ringClient.Ring(). + GetReplicationSetForOperation(ring.WriteNoExtend) + if err != nil || len(replicationSet.Instances) == 0 { + ts.ingesterMetricAppends.WithLabelValues("fail").Inc() + level.Error(ts.logger).Log( + "msg", "failed to send metrics to fallback pattern ingesters", + "num_instances", len(replicationSet.Instances), + "err", err, + ) + return errors.New("no instances found for fallback") + } + + fallbackAddrs := make([]string, 0, len(replicationSet.Instances)) + for _, instance := range replicationSet.Instances { + addr := instance.Addr + fallbackAddrs = append(fallbackAddrs, addr) + + var client ring_client.PoolClient + client, err = ts.ringClient.GetClientFor(addr) + if err != nil { + ctx, cancel := context.WithTimeout( + user.InjectOrgID(ctx, clientRequest.tenant), + ts.cfg.ClientConfig.RemoteTimeout, + ) + defer cancel() + + _, err = client.(logproto.PatternClient).Push(ctx, req) + if err != nil { + continue + } + + ts.ingesterMetricAppends.WithLabelValues("success").Inc() + // bail after any success to prevent sending more than one + return nil + } + } + + ts.ingesterMetricAppends.WithLabelValues("fail").Inc() + level.Error(ts.logger).Log( + "msg", "failed to send metrics to fallback pattern ingesters. exhausted all fallback instances", + "addresses", strings.Join(fallbackAddrs, ", "), + "err", err, + ) + return err + }) + } +} + +// Duplicate Implements distributor.Tee which is used to tee distributor requests to pattern ingesters. +func (ts *TeeService) Duplicate(tenant string, streams []distributor.KeyedStream) { + if !ts.cfg.Enabled { + return + } + + if len(streams) == 0 { + return + } + + for _, stream := range streams { + lbls, err := syntax.ParseLabels(stream.Stream.Labels) + if err != nil || lbls.Has(push.AggregatedMetricLabel) { + level.Error(ts.logger). + Log("msg", "error parsing stream labels", "labels", stream.Stream.Labels, "err", err) + + continue + } + + ts.buffersMutex.Lock() + ts.buffers[tenant] = append(ts.buffers[tenant], stream) + ts.buffersMutex.Unlock() + } +} diff --git a/pkg/pattern/tee_service_test.go b/pkg/pattern/tee_service_test.go new file mode 100644 index 000000000000..1be8114df022 --- /dev/null +++ b/pkg/pattern/tee_service_test.go @@ -0,0 +1,183 @@ +package pattern + +import ( + "context" + "flag" + "slices" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/grafana/dskit/ring" + "github.com/grafana/dskit/user" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/v3/pkg/distributor" + "github.com/grafana/loki/v3/pkg/logproto" + + "github.com/grafana/loki/pkg/push" +) + +func getTestTee(t *testing.T) (*TeeService, *mockPoolClient) { + cfg := Config{} + cfg.RegisterFlags(flag.NewFlagSet("test", flag.PanicOnError)) // set up defaults + + cfg.Enabled = true + + response := &logproto.PushResponse{} + client := &mockPoolClient{} + client.On("Push", mock.Anything, mock.Anything).Return(response, nil) + + replicationSet := ring.ReplicationSet{ + Instances: []ring.InstanceDesc{ + {Id: "localhost", Addr: "ingester0"}, + {Id: "remotehost", Addr: "ingester1"}, + {Id: "otherhost", Addr: "ingester2"}, + }, + } + + fakeRing := &fakeRing{} + fakeRing.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(replicationSet, nil) + + ringClient := &fakeRingClient{ + poolClient: client, + ring: fakeRing, + } + + logsTee, err := NewTeeService( + cfg, + ringClient, + "test", + nil, + log.NewNopLogger(), + ) + require.NoError(t, err) + + return logsTee, client +} + +func TestPatternTeeBasic(t *testing.T) { + tee, client := getTestTee(t) + + ctx, cancel := context.WithCancel(context.Background()) + + require.NoError(t, tee.Start(ctx)) + + now := time.Now() + tee.Duplicate("test-tenant", []distributor.KeyedStream{ + {HashKey: 123, Stream: push.Stream{ + Labels: `{foo="bar"}`, + Entries: []push.Entry{ + {Timestamp: now, Line: "foo1"}, + {Timestamp: now.Add(1 * time.Second), Line: "bar1"}, + {Timestamp: now.Add(2 * time.Second), Line: "baz1"}, + }, + }}, + }) + + tee.Duplicate("test-tenant", []distributor.KeyedStream{ + {HashKey: 123, Stream: push.Stream{ + Labels: `{foo="bar"}`, + Entries: []push.Entry{ + {Timestamp: now.Add(3 * time.Second), Line: "foo2"}, + {Timestamp: now.Add(4 * time.Second), Line: "bar2"}, + {Timestamp: now.Add(5 * time.Second), Line: "baz2"}, + }, + }}, + }) + + tee.Duplicate("test-tenant", []distributor.KeyedStream{ + {HashKey: 456, Stream: push.Stream{ + Labels: `{ping="pong"}`, + Entries: []push.Entry{ + {Timestamp: now.Add(1 * time.Second), Line: "ping"}, + {Timestamp: now.Add(2 * time.Second), Line: "pong"}, + }, + }}, + }) + + cancel() + + // This should ensure that everything has been flushed and we have no data races below. + tee.WaitUntilDone() + + req := client.req + reqCtx := client.ctx + + require.NotNil(t, req) + tenant, err := user.ExtractOrgID(reqCtx) + require.NoError(t, err) + + require.Equal(t, "test-tenant", tenant) + + require.Len(t, req.Streams, 3) + + fooBarEntries := []push.Entry{} + pingPongEntries := []push.Entry{} + + for _, stream := range req.Streams { + if stream.Labels == `{foo="bar"}` { + fooBarEntries = append(fooBarEntries, stream.Entries...) + } + + if stream.Labels == `{ping="pong"}` { + pingPongEntries = append(pingPongEntries, stream.Entries...) + } + } + + slices.SortFunc(fooBarEntries, func(i, j push.Entry) int { + return i.Timestamp.Compare(j.Timestamp) + }) + + slices.SortFunc(pingPongEntries, func(i, j push.Entry) int { + return i.Timestamp.Compare(j.Timestamp) + }) + + require.Equal(t, []push.Entry{ + {Timestamp: now, Line: "foo1"}, + {Timestamp: now.Add(1 * time.Second), Line: "bar1"}, + {Timestamp: now.Add(2 * time.Second), Line: "baz1"}, + {Timestamp: now.Add(3 * time.Second), Line: "foo2"}, + {Timestamp: now.Add(4 * time.Second), Line: "bar2"}, + {Timestamp: now.Add(5 * time.Second), Line: "baz2"}, + }, fooBarEntries) + + require.Equal(t, []push.Entry{ + {Timestamp: now.Add(1 * time.Second), Line: "ping"}, + {Timestamp: now.Add(2 * time.Second), Line: "pong"}, + }, pingPongEntries) +} + +func TestPatternTeeEmptyStream(t *testing.T) { + tee, client := getTestTee(t) + + ctx, cancel := context.WithCancel(context.Background()) + + require.NoError(t, tee.Start(ctx)) + + tee.Duplicate("test-tenant", []distributor.KeyedStream{ + {HashKey: 123, Stream: push.Stream{ + Labels: `{foo="bar"}`, + Entries: []push.Entry{}, + }}, + }) + + tee.Duplicate("test-tenant", []distributor.KeyedStream{ + {HashKey: 456, Stream: push.Stream{ + Labels: `{ping="pong"}`, + Entries: []push.Entry{}, + }}, + }) + + cancel() + + // This should ensure that everything has been flushed and we have no data races below. + tee.WaitUntilDone() + + req := client.req + reqCtx := client.ctx + + require.Nil(t, req) + require.Nil(t, reqCtx) +} diff --git a/pkg/util/constants/levels.go b/pkg/util/constants/levels.go new file mode 100644 index 000000000000..df735f84db40 --- /dev/null +++ b/pkg/util/constants/levels.go @@ -0,0 +1,24 @@ +package constants + +const ( + LevelLabel = "detected_level" + LogLevelUnknown = "unknown" + LogLevelDebug = "debug" + LogLevelInfo = "info" + LogLevelWarn = "warn" + LogLevelError = "error" + LogLevelFatal = "fatal" + LogLevelCritical = "critical" + LogLevelTrace = "trace" +) + +var LogLevels = []string{ + LogLevelUnknown, + LogLevelDebug, + LogLevelInfo, + LogLevelWarn, + LogLevelError, + LogLevelFatal, + LogLevelCritical, + LogLevelTrace, +}