diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a683cdc1ed..dd2616b0f47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ * [ENHANCEMENT] Update memcached default image in jsonnet for multiple CVE [#3310](https://github.com/grafana/tempo/pull/3310) (@zalegrala) * [ENHANCEMENT] Add HTML pages /status/overrides and /status/overrides/{tenant} [#3244](https://github.com/grafana/tempo/pull/3244) [#3332](https://github.com/grafana/tempo/pull/3332) (@kvrhdn) * [ENHANCEMENT] Precalculate and reuse the vParquet3 schema before opening blocks [#3367](https://github.com/grafana/tempo/pull/3367) (@stoewer) +* [ENHANCEMENT] Add `--shutdown-delay` to allow Tempo to cleanly drain connections. [#3395](https://github.com/grafana/tempo/pull/3395) (@joe-elliott) * [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno) * [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno) * [BUGFIX] Fixed an issue where cached footers were requested then ignored. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott) diff --git a/cmd/tempo/app/app.go b/cmd/tempo/app/app.go index 3a5f3a47305..9a6f5156a5b 100644 --- a/cmd/tempo/app/app.go +++ b/cmd/tempo/app/app.go @@ -9,6 +9,7 @@ import ( "io" "net/http" "sort" + "time" "github.com/go-kit/log/level" "github.com/gorilla/mux" @@ -23,6 +24,7 @@ import ( "github.com/jedib0t/go-pretty/v6/table" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/version" + "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" "gopkg.in/yaml.v3" @@ -42,6 +44,7 @@ import ( "github.com/grafana/tempo/pkg/usagestats" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/log" + util_log "github.com/grafana/tempo/pkg/util/log" ) const ( @@ -189,17 +192,23 @@ func (t *App) Run() error { return fmt.Errorf("failed to start service manager: %w", err) } + // Used to delay shutdown but return "not ready" during this delay. + shutdownRequested := atomic.NewBool(false) // before starting servers, register /ready handler and gRPC health check service. if t.cfg.InternalServer.Enable { - t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm)) + t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm, shutdownRequested)) } t.Server.HTTP().Path(addHTTPAPIPrefix(&t.cfg, api.PathBuildInfo)).Handler(t.buildinfoHandler()).Methods("GET") - t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm)) + t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm, shutdownRequested)) t.Server.HTTP().Path("/status").Handler(t.statusHandler()).Methods("GET") t.Server.HTTP().Path("/status/{endpoint}").Handler(t.statusHandler()).Methods("GET") - grpc_health_v1.RegisterHealthServer(t.Server.GRPC(), grpcutil.NewHealthCheck(sm)) + grpc_health_v1.RegisterHealthServer(t.Server.GRPC(), + grpcutil.NewHealthCheckFrom( + grpcutil.WithShutdownRequested(shutdownRequested), + grpcutil.WithManager(sm), + )) // Let's listen for events from this manager, and log them. healthy := func() { level.Info(log.Logger).Log("msg", "Tempo started") } @@ -231,6 +240,14 @@ func (t *App) Run() error { handler := signals.NewHandler(t.Server.Log()) go func() { handler.Loop() + + shutdownRequested.Store(true) + t.Server.SetKeepAlivesEnabled(false) + + if t.cfg.ShutdownDelay > 0 { + time.Sleep(t.cfg.ShutdownDelay) + } + sm.StopAsync() }() @@ -301,8 +318,14 @@ func (t *App) writeStatusConfig(w io.Writer, r *http.Request) error { return nil } -func (t *App) readyHandler(sm *services.Manager) http.HandlerFunc { +func (t *App) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + if shutdownRequested.Load() { + level.Debug(util_log.Logger).Log("msg", "application is stopping") + http.Error(w, "Application is stopping", http.StatusServiceUnavailable) + return + } + if !sm.IsHealthy() { msg := bytes.Buffer{} msg.WriteString("Some services are not Running:\n") diff --git a/cmd/tempo/app/config.go b/cmd/tempo/app/config.go index 9a8490800aa..bfcad225b47 100644 --- a/cmd/tempo/app/config.go +++ b/cmd/tempo/app/config.go @@ -31,14 +31,15 @@ import ( // Config is the root config for App. type Config struct { - Target string `yaml:"target,omitempty"` - AuthEnabled bool `yaml:"auth_enabled,omitempty"` - MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"` - StreamOverHTTPEnabled bool `yaml:"stream_over_http_enabled,omitempty"` - HTTPAPIPrefix string `yaml:"http_api_prefix"` - UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"` - EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"` - AutocompleteFilteringEnabled bool `yaml:"autocomplete_filtering_enabled,omitempty"` + Target string `yaml:"target,omitempty"` + AuthEnabled bool `yaml:"auth_enabled,omitempty"` + MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"` + ShutdownDelay time.Duration `yaml:"shutdown_delay,omitempty"` + StreamOverHTTPEnabled bool `yaml:"stream_over_http_enabled,omitempty"` + HTTPAPIPrefix string `yaml:"http_api_prefix"` + UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"` + EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"` + AutocompleteFilteringEnabled bool `yaml:"autocomplete_filtering_enabled,omitempty"` Server server.Config `yaml:"server,omitempty"` InternalServer internalserver.Config `yaml:"internal_server,omitempty"` @@ -76,6 +77,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) { f.BoolVar(&c.UseOTelTracer, "use-otel-tracer", false, "Set to true to replace the OpenTracing tracer with the OpenTelemetry tracer") f.BoolVar(&c.EnableGoRuntimeMetrics, "enable-go-runtime-metrics", false, "Set to true to enable all Go runtime metrics") f.BoolVar(&c.AutocompleteFilteringEnabled, "autocomplete-filtering.enabled", true, "Set to false to disable autocomplete filtering") + f.DurationVar(&c.ShutdownDelay, "shutdown-delay", 0, "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Tempo will report not-ready status via /ready endpoint.") // Server settings flagext.DefaultValues(&c.Server) diff --git a/cmd/tempo/app/server_service.go b/cmd/tempo/app/server_service.go index 93dd2a7acdd..dd70c30d395 100644 --- a/cmd/tempo/app/server_service.go +++ b/cmd/tempo/app/server_service.go @@ -25,6 +25,7 @@ type TempoServer interface { GRPC() *grpc.Server Log() log.Logger EnableHTTP2() + SetKeepAlivesEnabled(enabled bool) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) } @@ -62,6 +63,10 @@ func (s *tempoServer) EnableHTTP2() { }) } +func (s *tempoServer) SetKeepAlivesEnabled(enabled bool) { + s.externalServer.HTTPServer.SetKeepAlivesEnabled(enabled) +} + func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) { var err error diff --git a/integration/e2e/e2e_test.go b/integration/e2e/e2e_test.go index be20269eca1..5fe1bff8349 100644 --- a/integration/e2e/e2e_test.go +++ b/integration/e2e/e2e_test.go @@ -312,6 +312,53 @@ func TestMicroservicesWithKVStores(t *testing.T) { } } +func TestShutdownDelay(t *testing.T) { + s, err := e2e.NewScenario("tempo_e2e") + require.NoError(t, err) + defer s.Close() + + // set up the backend + cfg := app.Config{} + buff, err := os.ReadFile(configAllInOneS3) + require.NoError(t, err) + err = yaml.UnmarshalStrict(buff, &cfg) + require.NoError(t, err) + _, err = backend.New(s, cfg) + require.NoError(t, err) + + require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneS3, "config.yaml")) + tempo := util.NewTempoAllInOne("-shutdown-delay=5s") + + // this line tests confirms that the readiness flag is up + require.NoError(t, s.StartAndWaitReady(tempo)) + + // if we're here the readiness flag is up. now call kill and check the readiness flag is down + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + for i := 0; i < 10; i++ { + res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready") + require.NoError(t, err) + res.Body.Close() + + if res.StatusCode == http.StatusServiceUnavailable { + // found it! + return + } + time.Sleep(time.Second) + } + + require.Fail(t, "readiness flag never went down") + }() + + // call stop and allow the code above to test for a unavailable readiness flag + _ = tempo.Stop() + + wg.Wait() +} + func TestScalableSingleBinary(t *testing.T) { s, err := e2e.NewScenario("tempo_e2e") require.NoError(t, err)