Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shutdown delay and e2e test #3395

Merged
merged 4 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
* [ENHANCEMENT] Update memcached default image in jsonnet for multiple CVE [#3310](https://github.com/grafana/tempo/pull/3310) (@zalegrala)
* [ENHANCEMENT] Add HTML pages /status/overrides and /status/overrides/{tenant} [#3244](https://github.com/grafana/tempo/pull/3244) [#3332](https://github.com/grafana/tempo/pull/3332) (@kvrhdn)
* [ENHANCEMENT] Precalculate and reuse the vParquet3 schema before opening blocks [#3367](https://github.com/grafana/tempo/pull/3367) (@stoewer)
* [ENHANCEMENT] Add `--shutdown-delay` to allow Tempo to cleanly drain connections. [#3395](https://github.com/grafana/tempo/pull/3395) (@joe-elliott)
* [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno)
* [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno)
* [BUGFIX] Fixed an issue where cached footers were requested then ignored. [#3196](https://github.com/grafana/tempo/pull/3196) (@joe-elliott)
Expand Down
31 changes: 27 additions & 4 deletions cmd/tempo/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"net/http"
"sort"
"time"

"github.com/go-kit/log/level"
"github.com/gorilla/mux"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/jedib0t/go-pretty/v6/table"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/version"
"go.uber.org/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"gopkg.in/yaml.v3"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/grafana/tempo/pkg/usagestats"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/log"
util_log "github.com/grafana/tempo/pkg/util/log"
)

const (
Expand Down Expand Up @@ -189,17 +192,23 @@ func (t *App) Run() error {
return fmt.Errorf("failed to start service manager: %w", err)
}

// Used to delay shutdown but return "not ready" during this delay.
shutdownRequested := atomic.NewBool(false)
// before starting servers, register /ready handler and gRPC health check service.
if t.cfg.InternalServer.Enable {
t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm))
t.InternalServer.HTTP.Path("/ready").Methods("GET").Handler(t.readyHandler(sm, shutdownRequested))
}

t.Server.HTTP().Path(addHTTPAPIPrefix(&t.cfg, api.PathBuildInfo)).Handler(t.buildinfoHandler()).Methods("GET")

t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm))
t.Server.HTTP().Path("/ready").Handler(t.readyHandler(sm, shutdownRequested))
t.Server.HTTP().Path("/status").Handler(t.statusHandler()).Methods("GET")
t.Server.HTTP().Path("/status/{endpoint}").Handler(t.statusHandler()).Methods("GET")
grpc_health_v1.RegisterHealthServer(t.Server.GRPC(), grpcutil.NewHealthCheck(sm))
grpc_health_v1.RegisterHealthServer(t.Server.GRPC(),
grpcutil.NewHealthCheckFrom(
grpcutil.WithShutdownRequested(shutdownRequested),
grpcutil.WithManager(sm),
))

// Let's listen for events from this manager, and log them.
healthy := func() { level.Info(log.Logger).Log("msg", "Tempo started") }
Expand Down Expand Up @@ -231,6 +240,14 @@ func (t *App) Run() error {
handler := signals.NewHandler(t.Server.Log())
go func() {
handler.Loop()

shutdownRequested.Store(true)
t.Server.SetKeepAlivesEnabled(false)

if t.cfg.ShutdownDelay > 0 {
time.Sleep(t.cfg.ShutdownDelay)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we also want to sleep when there are no requests in flight?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm concerned about the complexity about that change. this is a very simple one that will do nearly all of what we want with some config choices in k8s

}

sm.StopAsync()
}()

Expand Down Expand Up @@ -301,8 +318,14 @@ func (t *App) writeStatusConfig(w io.Writer, r *http.Request) error {
return nil
}

func (t *App) readyHandler(sm *services.Manager) http.HandlerFunc {
func (t *App) readyHandler(sm *services.Manager, shutdownRequested *atomic.Bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if shutdownRequested.Load() {
level.Debug(util_log.Logger).Log("msg", "application is stopping")
http.Error(w, "Application is stopping", http.StatusServiceUnavailable)
return
}

if !sm.IsHealthy() {
msg := bytes.Buffer{}
msg.WriteString("Some services are not Running:\n")
Expand Down
18 changes: 10 additions & 8 deletions cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import (

// Config is the root config for App.
type Config struct {
Target string `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"`
StreamOverHTTPEnabled bool `yaml:"stream_over_http_enabled,omitempty"`
HTTPAPIPrefix string `yaml:"http_api_prefix"`
UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"`
EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"`
AutocompleteFilteringEnabled bool `yaml:"autocomplete_filtering_enabled,omitempty"`
Target string `yaml:"target,omitempty"`
AuthEnabled bool `yaml:"auth_enabled,omitempty"`
MultitenancyEnabled bool `yaml:"multitenancy_enabled,omitempty"`
ShutdownDelay time.Duration `yaml:"shutdown_delay,omitempty"`
StreamOverHTTPEnabled bool `yaml:"stream_over_http_enabled,omitempty"`
HTTPAPIPrefix string `yaml:"http_api_prefix"`
UseOTelTracer bool `yaml:"use_otel_tracer,omitempty"`
EnableGoRuntimeMetrics bool `yaml:"enable_go_runtime_metrics,omitempty"`
AutocompleteFilteringEnabled bool `yaml:"autocomplete_filtering_enabled,omitempty"`

Server server.Config `yaml:"server,omitempty"`
InternalServer internalserver.Config `yaml:"internal_server,omitempty"`
Expand Down Expand Up @@ -76,6 +77,7 @@ func (c *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
f.BoolVar(&c.UseOTelTracer, "use-otel-tracer", false, "Set to true to replace the OpenTracing tracer with the OpenTelemetry tracer")
f.BoolVar(&c.EnableGoRuntimeMetrics, "enable-go-runtime-metrics", false, "Set to true to enable all Go runtime metrics")
f.BoolVar(&c.AutocompleteFilteringEnabled, "autocomplete-filtering.enabled", true, "Set to false to disable autocomplete filtering")
f.DurationVar(&c.ShutdownDelay, "shutdown-delay", 0, "How long to wait between SIGTERM and shutdown. After receiving SIGTERM, Tempo will report not-ready status via /ready endpoint.")
electron0zero marked this conversation as resolved.
Show resolved Hide resolved

// Server settings
flagext.DefaultValues(&c.Server)
Expand Down
5 changes: 5 additions & 0 deletions cmd/tempo/app/server_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type TempoServer interface {
GRPC() *grpc.Server
Log() log.Logger
EnableHTTP2()
SetKeepAlivesEnabled(enabled bool)

StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error)
}
Expand Down Expand Up @@ -62,6 +63,10 @@ func (s *tempoServer) EnableHTTP2() {
})
}

func (s *tempoServer) SetKeepAlivesEnabled(enabled bool) {
s.externalServer.HTTPServer.SetKeepAlivesEnabled(enabled)
}

func (s *tempoServer) StartAndReturnService(cfg server.Config, supportGRPCOnHTTP bool, servicesToWaitFor func() []services.Service) (services.Service, error) {
var err error

Expand Down
47 changes: 47 additions & 0 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,53 @@ func TestMicroservicesWithKVStores(t *testing.T) {
}
}

func TestShutdownDelay(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

// set up the backend
cfg := app.Config{}
buff, err := os.ReadFile(configAllInOneS3)
require.NoError(t, err)
err = yaml.UnmarshalStrict(buff, &cfg)
require.NoError(t, err)
_, err = backend.New(s, cfg)
require.NoError(t, err)

require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneS3, "config.yaml"))
tempo := util.NewTempoAllInOne("-shutdown-delay=5s")

// this line tests confirms that the readiness flag is up
require.NoError(t, s.StartAndWaitReady(tempo))

// if we're here the readiness flag is up. now call kill and check the readiness flag is down
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

for i := 0; i < 10; i++ {
res, err := e2e.DoGet("http://" + tempo.Endpoint(3200) + "/ready")
require.NoError(t, err)
res.Body.Close()

if res.StatusCode == http.StatusServiceUnavailable {
// found it!
return
}
time.Sleep(time.Second)
}

require.Fail(t, "readiness flag never went down")
}()

// call stop and allow the code above to test for a unavailable readiness flag
_ = tempo.Stop()

wg.Wait()
}

func TestScalableSingleBinary(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
Expand Down
Loading