Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration for fetch services older than complete_block_timeout #3350

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
## main / unreleased

* [FEATURE] Add configuration on tempo-query plugin for fetch services older than complete_block_timeout [#3262](https://github.com/grafana/tempo/pull/3350) (@rubenvp8510)
* [FEATURE] Add support for multi-tenant queries in streaming search [#3262](https://github.com/grafana/tempo/pull/3262) (@electron0zero)
* [FEATURE] TraceQL metrics queries [#3227](https://github.com/grafana/tempo/pull/3227) [#3252](https://github.com/grafana/tempo/pull/3252) [#3258](https://github.com/grafana/tempo/pull/3258) (@mdisibio @zalegrala)
* [FEATURE] Add support for multi-tenant queries. [#3087](https://github.com/grafana/tempo/pull/3087) (@electron0zero)
Expand Down
10 changes: 6 additions & 4 deletions cmd/tempo-query/tempo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (

// Config holds the configuration for redbull.
type Config struct {
Backend string `yaml:"backend"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
TenantHeaderKey string `yaml:"tenant_header_key"`
Backend string `yaml:"backend"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
TenantHeaderKey string `yaml:"tenant_header_key"`
QueryServicesDuration string `yaml:"services_query_duration"`
}

// InitFromViper initializes the options struct with values from Viper
Expand All @@ -25,6 +26,7 @@ func (c *Config) InitFromViper(v *viper.Viper) {
c.TLS.InsecureSkipVerify = v.GetBool("tls_insecure_skip_verify")
c.TLS.CipherSuites = v.GetString("tls_cipher_suites")
c.TLS.MinVersion = v.GetString("tls_min_version")
c.QueryServicesDuration = v.GetString("services_query_duration")

tenantHeader := v.GetString("tenant_header_key")
if tenantHeader == "" {
Expand Down
49 changes: 38 additions & 11 deletions cmd/tempo-query/tempo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,38 @@ var tlsVersions = map[string]uint16{
}

type Backend struct {
tempoBackend string
tlsEnabled bool
tls tlsCfg.ClientConfig
httpClient *http.Client
tenantHeaderKey string
tempoBackend string
tlsEnabled bool
tls tlsCfg.ClientConfig
httpClient *http.Client
tenantHeaderKey string
QueryServicesDuration *time.Duration
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
}

func New(cfg *Config) (*Backend, error) {
httpClient, err := createHTTPClient(cfg)
if err != nil {
return nil, err
}

var queryServiceDuration *time.Duration

if cfg.QueryServicesDuration != "" {
queryDuration, err := time.ParseDuration(cfg.QueryServicesDuration)
if err != nil {
return nil, err
}
queryServiceDuration = &queryDuration

}

return &Backend{
tempoBackend: cfg.Backend,
tlsEnabled: cfg.TLSEnabled,
tls: cfg.TLS,
httpClient: httpClient,
tenantHeaderKey: cfg.TenantHeaderKey,
tempoBackend: cfg.Backend,
tlsEnabled: cfg.TLSEnabled,
tls: cfg.TLS,
httpClient: httpClient,
tenantHeaderKey: cfg.TenantHeaderKey,
QueryServicesDuration: queryServiceDuration,
}, nil
}

Expand Down Expand Up @@ -229,6 +243,12 @@ func (b *Backend) GetTrace(ctx context.Context, traceID jaeger.TraceID) (*jaeger
return jaegerTrace, nil
}

func (b *Backend) calculateTimeRange() (int64, int64) {
now := time.Now()
start := now.Add(*b.QueryServicesDuration * -1)
return start.Unix(), now.Unix()
}

func (b *Backend) GetServices(ctx context.Context) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetOperations")
defer span.Finish()
Expand Down Expand Up @@ -379,7 +399,14 @@ func createTagsQueryParam(service string, operation string, tags map[string]stri
}

func (b *Backend) lookupTagValues(ctx context.Context, span opentracing.Span, tagName string) ([]string, error) {
url := fmt.Sprintf("%s://%s/api/search/tag/%s/values", b.apiSchema(), b.tempoBackend, tagName)
var url string

if b.QueryServicesDuration == nil {
url = fmt.Sprintf("%s://%s/api/search/tag/%s/values", b.apiSchema(), b.tempoBackend, tagName)
} else {
startTime, endTime := b.calculateTimeRange()
url = fmt.Sprintf("%s://%s/api/search/tag/%s/values?start=%d&end=%d", b.apiSchema(), b.tempoBackend, tagName, startTime, endTime)
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
}

req, err := b.newGetRequest(ctx, url, span)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions integration/e2e/config-plugin-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ server:
http_listen_port: 3200
log_level: debug

query_frontend:
search:
query_backend_after: 1s

distributor:
receivers:
jaeger:
Expand Down Expand Up @@ -38,3 +42,6 @@ storage:
pool:
max_workers: 10
queue_depth: 100
block:
version: vParquet3
blocklist_poll: 1s
1 change: 1 addition & 0 deletions integration/e2e/config-tempo-query.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
backend: tempo:3200
services_query_duration: 1h
40 changes: 40 additions & 0 deletions integration/e2e/query_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,46 @@ func TestSearchUsingJaegerPlugin(t *testing.T) {
callJaegerQuerySearchTraceAssert(t, tempoQuery, "execute", "backend")
}

func TestSearchUsingBackendTagsService(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice addition!

s, err := e2e.NewScenario("tempo_query_plugin_backend_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, "config-plugin-test.yaml", "config.yaml"))
require.NoError(t, util.CopyFileToSharedDir(s, "config-tempo-query.yaml", "config-tempo-query.yaml"))

tempo := util.NewTempoAllInOne()
tempoQuery := util.NewTempoQuery()

require.NoError(t, s.StartAndWaitReady(tempo))
require.NoError(t, s.StartAndWaitReady(tempoQuery))

jaegerClient, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, jaegerClient)

batch := makeThriftBatchWithSpanCountForServiceAndOp(2, "execute", "backend")
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

batch = makeThriftBatchWithSpanCountForServiceAndOp(2, "request", "frontend")
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

// Wait for the traces to be written to the WAL
time.Sleep(time.Second * 3)

callFlush(t, tempo)
time.Sleep(time.Second * 1)
callFlush(t, tempo)

callJaegerQuerySearchServicesAssert(t, tempoQuery, servicesOrOpJaegerQueryResponse{
Data: []string{
"frontend",
"backend",
},
Total: 2,
})
}

func callJaegerQuerySearchServicesAssert(t *testing.T, svc *e2e.HTTPService, expected servicesOrOpJaegerQueryResponse) {
// search for tag values
req, err := http.NewRequest(http.MethodGet, "http://"+svc.Endpoint(16686)+"/api/services", nil)
Expand Down
Loading