diff --git a/CHANGELOG.md b/CHANGELOG.md index edabaa89242..c4cd9e08a4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## main / unreleased - +* [FEATURE] Add configuration on tempo-query plugin for fetch services older than complete_block_timeout [#3262](https://github.com/grafana/tempo/pull/3350) (@rubenvp8510) * [FEATURE] Add support for multi-tenant queries in streaming search [#3262](https://github.com/grafana/tempo/pull/3262) (@electron0zero) * [FEATURE] TraceQL metrics queries [#3227](https://github.com/grafana/tempo/pull/3227) [#3252](https://github.com/grafana/tempo/pull/3252) [#3258](https://github.com/grafana/tempo/pull/3258) (@mdisibio @zalegrala) * [FEATURE] Add support for multi-tenant queries. [#3087](https://github.com/grafana/tempo/pull/3087) (@electron0zero) diff --git a/cmd/tempo-query/tempo/config.go b/cmd/tempo-query/tempo/config.go index accf5a15586..74a2567f7e5 100644 --- a/cmd/tempo-query/tempo/config.go +++ b/cmd/tempo-query/tempo/config.go @@ -8,10 +8,11 @@ import ( // Config holds the configuration for redbull. type Config struct { - Backend string `yaml:"backend"` - TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` - TLS tls.ClientConfig `yaml:",inline"` - TenantHeaderKey string `yaml:"tenant_header_key"` + Backend string `yaml:"backend"` + TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` + TLS tls.ClientConfig `yaml:",inline"` + TenantHeaderKey string `yaml:"tenant_header_key"` + QueryServicesDuration string `yaml:"services_query_duration"` } // InitFromViper initializes the options struct with values from Viper @@ -25,6 +26,7 @@ func (c *Config) InitFromViper(v *viper.Viper) { c.TLS.InsecureSkipVerify = v.GetBool("tls_insecure_skip_verify") c.TLS.CipherSuites = v.GetString("tls_cipher_suites") c.TLS.MinVersion = v.GetString("tls_min_version") + c.QueryServicesDuration = v.GetString("services_query_duration") tenantHeader := v.GetString("tenant_header_key") if tenantHeader == "" { diff --git a/cmd/tempo-query/tempo/plugin.go b/cmd/tempo-query/tempo/plugin.go index 97b4c5994f7..b045c2b94f3 100644 --- a/cmd/tempo-query/tempo/plugin.go +++ b/cmd/tempo-query/tempo/plugin.go @@ -53,11 +53,12 @@ var tlsVersions = map[string]uint16{ } type Backend struct { - tempoBackend string - tlsEnabled bool - tls tlsCfg.ClientConfig - httpClient *http.Client - tenantHeaderKey string + tempoBackend string + tlsEnabled bool + tls tlsCfg.ClientConfig + httpClient *http.Client + tenantHeaderKey string + QueryServicesDuration *time.Duration } func New(cfg *Config) (*Backend, error) { @@ -65,12 +66,25 @@ func New(cfg *Config) (*Backend, error) { if err != nil { return nil, err } + + var queryServiceDuration *time.Duration + + if cfg.QueryServicesDuration != "" { + queryDuration, err := time.ParseDuration(cfg.QueryServicesDuration) + if err != nil { + return nil, err + } + queryServiceDuration = &queryDuration + + } + return &Backend{ - tempoBackend: cfg.Backend, - tlsEnabled: cfg.TLSEnabled, - tls: cfg.TLS, - httpClient: httpClient, - tenantHeaderKey: cfg.TenantHeaderKey, + tempoBackend: cfg.Backend, + tlsEnabled: cfg.TLSEnabled, + tls: cfg.TLS, + httpClient: httpClient, + tenantHeaderKey: cfg.TenantHeaderKey, + QueryServicesDuration: queryServiceDuration, }, nil } @@ -229,6 +243,12 @@ func (b *Backend) GetTrace(ctx context.Context, traceID jaeger.TraceID) (*jaeger return jaegerTrace, nil } +func (b *Backend) calculateTimeRange() (int64, int64) { + now := time.Now() + start := now.Add(*b.QueryServicesDuration * -1) + return start.Unix(), now.Unix() +} + func (b *Backend) GetServices(ctx context.Context) ([]string, error) { span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetOperations") defer span.Finish() @@ -379,7 +399,14 @@ func createTagsQueryParam(service string, operation string, tags map[string]stri } func (b *Backend) lookupTagValues(ctx context.Context, span opentracing.Span, tagName string) ([]string, error) { - url := fmt.Sprintf("%s://%s/api/search/tag/%s/values", b.apiSchema(), b.tempoBackend, tagName) + var url string + + if b.QueryServicesDuration == nil { + url = fmt.Sprintf("%s://%s/api/search/tag/%s/values", b.apiSchema(), b.tempoBackend, tagName) + } else { + startTime, endTime := b.calculateTimeRange() + url = fmt.Sprintf("%s://%s/api/search/tag/%s/values?start=%d&end=%d", b.apiSchema(), b.tempoBackend, tagName, startTime, endTime) + } req, err := b.newGetRequest(ctx, url, span) if err != nil { diff --git a/integration/e2e/config-plugin-test.yaml b/integration/e2e/config-plugin-test.yaml index 4bfd8f8bee2..3858387acdf 100644 --- a/integration/e2e/config-plugin-test.yaml +++ b/integration/e2e/config-plugin-test.yaml @@ -4,6 +4,10 @@ server: http_listen_port: 3200 log_level: debug +query_frontend: + search: + query_backend_after: 1s + distributor: receivers: jaeger: @@ -38,3 +42,6 @@ storage: pool: max_workers: 10 queue_depth: 100 + block: + version: vParquet3 + blocklist_poll: 1s diff --git a/integration/e2e/config-tempo-query.yaml b/integration/e2e/config-tempo-query.yaml index 7cdb449690e..f5d50e5b262 100644 --- a/integration/e2e/config-tempo-query.yaml +++ b/integration/e2e/config-tempo-query.yaml @@ -1 +1,2 @@ backend: tempo:3200 +services_query_duration: 1h diff --git a/integration/e2e/query_plugin_test.go b/integration/e2e/query_plugin_test.go index f403a64fc47..2f96986d5b0 100644 --- a/integration/e2e/query_plugin_test.go +++ b/integration/e2e/query_plugin_test.go @@ -73,6 +73,46 @@ func TestSearchUsingJaegerPlugin(t *testing.T) { callJaegerQuerySearchTraceAssert(t, tempoQuery, "execute", "backend") } +func TestSearchUsingBackendTagsService(t *testing.T) { + s, err := e2e.NewScenario("tempo_query_plugin_backend_e2e") + require.NoError(t, err) + defer s.Close() + + require.NoError(t, util.CopyFileToSharedDir(s, "config-plugin-test.yaml", "config.yaml")) + require.NoError(t, util.CopyFileToSharedDir(s, "config-tempo-query.yaml", "config-tempo-query.yaml")) + + tempo := util.NewTempoAllInOne() + tempoQuery := util.NewTempoQuery() + + require.NoError(t, s.StartAndWaitReady(tempo)) + require.NoError(t, s.StartAndWaitReady(tempoQuery)) + + jaegerClient, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250)) + require.NoError(t, err) + require.NotNil(t, jaegerClient) + + batch := makeThriftBatchWithSpanCountForServiceAndOp(2, "execute", "backend") + require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch)) + + batch = makeThriftBatchWithSpanCountForServiceAndOp(2, "request", "frontend") + require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch)) + + // Wait for the traces to be written to the WAL + time.Sleep(time.Second * 3) + + callFlush(t, tempo) + time.Sleep(time.Second * 1) + callFlush(t, tempo) + + callJaegerQuerySearchServicesAssert(t, tempoQuery, servicesOrOpJaegerQueryResponse{ + Data: []string{ + "frontend", + "backend", + }, + Total: 2, + }) +} + func callJaegerQuerySearchServicesAssert(t *testing.T, svc *e2e.HTTPService, expected servicesOrOpJaegerQueryResponse) { // search for tag values req, err := http.NewRequest(http.MethodGet, "http://"+svc.Endpoint(16686)+"/api/services", nil)