Skip to content

Commit

Permalink
Add configuration for fetch services older than complete_block_timeout (
Browse files Browse the repository at this point in the history
#3350)

Signed-off-by: Ruben Vargas <ruben.vp8510@gmail.com>
  • Loading branch information
rubenvp8510 committed Feb 2, 2024
1 parent c1f9fd9 commit 08d9be8
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 16 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
## main / unreleased

* [FEATURE] Add configuration on tempo-query plugin for fetch services older than complete_block_timeout [#3262](https://github.com/grafana/tempo/pull/3350) (@rubenvp8510)
* [FEATURE] Add support for multi-tenant queries in streaming search [#3262](https://github.com/grafana/tempo/pull/3262) (@electron0zero)
* [FEATURE] TraceQL metrics queries [#3227](https://github.com/grafana/tempo/pull/3227) [#3252](https://github.com/grafana/tempo/pull/3252) [#3258](https://github.com/grafana/tempo/pull/3258) (@mdisibio @zalegrala)
* [FEATURE] Add support for multi-tenant queries. [#3087](https://github.com/grafana/tempo/pull/3087) (@electron0zero)
Expand Down
10 changes: 6 additions & 4 deletions cmd/tempo-query/tempo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (

// Config holds the configuration for redbull.
type Config struct {
Backend string `yaml:"backend"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
TenantHeaderKey string `yaml:"tenant_header_key"`
Backend string `yaml:"backend"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
TenantHeaderKey string `yaml:"tenant_header_key"`
QueryServicesDuration string `yaml:"services_query_duration"`
}

// InitFromViper initializes the options struct with values from Viper
Expand All @@ -25,6 +26,7 @@ func (c *Config) InitFromViper(v *viper.Viper) {
c.TLS.InsecureSkipVerify = v.GetBool("tls_insecure_skip_verify")
c.TLS.CipherSuites = v.GetString("tls_cipher_suites")
c.TLS.MinVersion = v.GetString("tls_min_version")
c.QueryServicesDuration = v.GetString("services_query_duration")

tenantHeader := v.GetString("tenant_header_key")
if tenantHeader == "" {
Expand Down
49 changes: 38 additions & 11 deletions cmd/tempo-query/tempo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,38 @@ var tlsVersions = map[string]uint16{
}

type Backend struct {
tempoBackend string
tlsEnabled bool
tls tlsCfg.ClientConfig
httpClient *http.Client
tenantHeaderKey string
tempoBackend string
tlsEnabled bool
tls tlsCfg.ClientConfig
httpClient *http.Client
tenantHeaderKey string
QueryServicesDuration *time.Duration
}

func New(cfg *Config) (*Backend, error) {
httpClient, err := createHTTPClient(cfg)
if err != nil {
return nil, err
}

var queryServiceDuration *time.Duration

if cfg.QueryServicesDuration != "" {
queryDuration, err := time.ParseDuration(cfg.QueryServicesDuration)
if err != nil {
return nil, err
}
queryServiceDuration = &queryDuration

}

return &Backend{
tempoBackend: cfg.Backend,
tlsEnabled: cfg.TLSEnabled,
tls: cfg.TLS,
httpClient: httpClient,
tenantHeaderKey: cfg.TenantHeaderKey,
tempoBackend: cfg.Backend,
tlsEnabled: cfg.TLSEnabled,
tls: cfg.TLS,
httpClient: httpClient,
tenantHeaderKey: cfg.TenantHeaderKey,
QueryServicesDuration: queryServiceDuration,
}, nil
}

Expand Down Expand Up @@ -229,6 +243,12 @@ func (b *Backend) GetTrace(ctx context.Context, traceID jaeger.TraceID) (*jaeger
return jaegerTrace, nil
}

func (b *Backend) calculateTimeRange() (int64, int64) {
now := time.Now()
start := now.Add(*b.QueryServicesDuration * -1)
return start.Unix(), now.Unix()
}

func (b *Backend) GetServices(ctx context.Context) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "tempo-query.GetOperations")
defer span.Finish()
Expand Down Expand Up @@ -379,7 +399,14 @@ func createTagsQueryParam(service string, operation string, tags map[string]stri
}

func (b *Backend) lookupTagValues(ctx context.Context, span opentracing.Span, tagName string) ([]string, error) {
url := fmt.Sprintf("%s://%s/api/search/tag/%s/values", b.apiSchema(), b.tempoBackend, tagName)
var url string

if b.QueryServicesDuration == nil {
url = fmt.Sprintf("%s://%s/api/search/tag/%s/values", b.apiSchema(), b.tempoBackend, tagName)
} else {
startTime, endTime := b.calculateTimeRange()
url = fmt.Sprintf("%s://%s/api/search/tag/%s/values?start=%d&end=%d", b.apiSchema(), b.tempoBackend, tagName, startTime, endTime)
}

req, err := b.newGetRequest(ctx, url, span)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions integration/e2e/config-plugin-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ server:
http_listen_port: 3200
log_level: debug

query_frontend:
search:
query_backend_after: 1s

distributor:
receivers:
jaeger:
Expand Down Expand Up @@ -38,3 +42,6 @@ storage:
pool:
max_workers: 10
queue_depth: 100
block:
version: vParquet3
blocklist_poll: 1s
1 change: 1 addition & 0 deletions integration/e2e/config-tempo-query.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
backend: tempo:3200
services_query_duration: 1h
40 changes: 40 additions & 0 deletions integration/e2e/query_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,46 @@ func TestSearchUsingJaegerPlugin(t *testing.T) {
callJaegerQuerySearchTraceAssert(t, tempoQuery, "execute", "backend")
}

func TestSearchUsingBackendTagsService(t *testing.T) {
s, err := e2e.NewScenario("tempo_query_plugin_backend_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, "config-plugin-test.yaml", "config.yaml"))
require.NoError(t, util.CopyFileToSharedDir(s, "config-tempo-query.yaml", "config-tempo-query.yaml"))

tempo := util.NewTempoAllInOne()
tempoQuery := util.NewTempoQuery()

require.NoError(t, s.StartAndWaitReady(tempo))
require.NoError(t, s.StartAndWaitReady(tempoQuery))

jaegerClient, err := util.NewJaegerGRPCClient(tempo.Endpoint(14250))
require.NoError(t, err)
require.NotNil(t, jaegerClient)

batch := makeThriftBatchWithSpanCountForServiceAndOp(2, "execute", "backend")
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

batch = makeThriftBatchWithSpanCountForServiceAndOp(2, "request", "frontend")
require.NoError(t, jaegerClient.EmitBatch(context.Background(), batch))

// Wait for the traces to be written to the WAL
time.Sleep(time.Second * 3)

callFlush(t, tempo)
time.Sleep(time.Second * 1)
callFlush(t, tempo)

callJaegerQuerySearchServicesAssert(t, tempoQuery, servicesOrOpJaegerQueryResponse{
Data: []string{
"frontend",
"backend",
},
Total: 2,
})
}

func callJaegerQuerySearchServicesAssert(t *testing.T, svc *e2e.HTTPService, expected servicesOrOpJaegerQueryResponse) {
// search for tag values
req, err := http.NewRequest(http.MethodGet, "http://"+svc.Endpoint(16686)+"/api/services", nil)
Expand Down

0 comments on commit 08d9be8

Please sign in to comment.