From a8ca06aacbdc321d7698ea83ca00de752c9e5280 Mon Sep 17 00:00:00 2001 From: Andrii Korotkov Date: Wed, 27 Nov 2024 22:21:18 -0800 Subject: [PATCH] fix: Graceful shutdown for the API server (#18642) Closes #18642 Implements a graceful shutdown the the API server. Without this, ArgoCD API server will eventually return 502 during rolling update. However, healthcheck would return 503 if the server is terminating. Signed-off-by: Andrii Korotkov Co-authored-by: Leonardo Luz Almeida Co-authored-by: Michael Crenshaw <350466+crenshaw-dev@users.noreply.github.com> --- cmd/argocd-server/commands/argocd_server.go | 3 + server/server.go | 163 ++++++++++++++++---- server/server_test.go | 69 +++++++++ test/e2e/aa_graceful_restart_test.go | 27 ++++ test/e2e/fixture/fixture.go | 36 ++--- 5 files changed, 246 insertions(+), 52 deletions(-) create mode 100644 test/e2e/aa_graceful_restart_test.go diff --git a/cmd/argocd-server/commands/argocd_server.go b/cmd/argocd-server/commands/argocd_server.go index 3de5d517e7d233..453d8f0d03dd69 100644 --- a/cmd/argocd-server/commands/argocd_server.go +++ b/cmd/argocd-server/commands/argocd_server.go @@ -273,6 +273,9 @@ func NewCommand() *cobra.Command { if closer != nil { closer() } + if argocd.TerminateRequested() { + break + } } }, Example: templates.Examples(` diff --git a/server/server.go b/server/server.go index 6625461dfab030..e961566a050783 100644 --- a/server/server.go +++ b/server/server.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "os/exec" + "os/signal" "path" "path/filepath" "reflect" @@ -20,6 +21,8 @@ import ( go_runtime "runtime" "strings" gosync "sync" + "sync/atomic" + "syscall" "time" // nolint:staticcheck @@ -187,17 +190,21 @@ type ArgoCDServer struct { db db.ArgoDB // stopCh is the channel which when closed, will shutdown the Argo CD server - stopCh chan struct{} - userStateStorage util_session.UserStateStorage - indexDataInit gosync.Once - indexData []byte - indexDataErr error - staticAssets http.FileSystem - apiFactory api.Factory - secretInformer cache.SharedIndexInformer - configMapInformer cache.SharedIndexInformer - serviceSet *ArgoCDServiceSet - extensionManager *extension.Manager + stopCh chan os.Signal + userStateStorage util_session.UserStateStorage + indexDataInit gosync.Once + indexData []byte + indexDataErr error + staticAssets http.FileSystem + apiFactory api.Factory + secretInformer cache.SharedIndexInformer + configMapInformer cache.SharedIndexInformer + serviceSet *ArgoCDServiceSet + extensionManager *extension.Manager + shutdown func() + terminateRequested atomic.Bool + receivedSignal atomic.Bool + available atomic.Bool } type ArgoCDServerOpts struct { @@ -329,6 +336,9 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio pg := extension.NewDefaultProjectGetter(projLister, dbInstance) ug := extension.NewDefaultUserGetter(policyEnf) em := extension.NewManager(logger, opts.Namespace, sg, ag, pg, enf, ug) + noopShutdown := func() { + log.Error("API Server Shutdown function called but server is not started yet.") + } a := &ArgoCDServer{ ArgoCDServerOpts: opts, @@ -352,6 +362,8 @@ func NewServer(ctx context.Context, opts ArgoCDServerOpts, appsetOpts Applicatio secretInformer: secretInformer, configMapInformer: configMapInformer, extensionManager: em, + shutdown: noopShutdown, + stopCh: make(chan os.Signal, 1), } err = a.logInClusterWarnings() @@ -369,6 +381,12 @@ const ( ) func (a *ArgoCDServer) healthCheck(r *http.Request) error { + if a.terminateRequested.Load() { + return errors.New("API Server is terminating and unable to serve requests.") + } + if !a.available.Load() { + return errors.New("API Server is not available. It either hasn't started or is restarting.") + } if val, ok := r.URL.Query()["full"]; ok && len(val) > 0 && val[0] == "true" { argoDB := db.NewDB(a.Namespace, a.settingsMgr, a.KubeClientset) _, err := argoDB.ListClusters(r.Context()) @@ -601,35 +619,116 @@ func (a *ArgoCDServer) Run(ctx context.Context, listeners *Listeners) { log.Fatal("Timed out waiting for project cache to sync") } - a.stopCh = make(chan struct{}) - <-a.stopCh + shutdownFunc := func() { + log.Info("API Server shutdown initiated. Shutting down servers...") + a.available.Store(false) + sCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + var wg gosync.WaitGroup + + // Shutdown http server + wg.Add(1) + go func() { + defer wg.Done() + err := httpS.Shutdown(sCtx) + if err != nil { + log.Errorf("Error shutting down http server: %s", err) + } + }() + + if httpsS != nil { + // Shutdown https server + wg.Add(1) + go func() { + defer wg.Done() + err := httpsS.Shutdown(sCtx) + if err != nil { + log.Errorf("Error shutting down https server: %s", err) + } + }() + } + + // Shutdown gRPC server + wg.Add(1) + go func() { + defer wg.Done() + grpcS.GracefulStop() + }() + + // Shutdown metrics server + wg.Add(1) + go func() { + defer wg.Done() + err := metricsServ.Shutdown(sCtx) + if err != nil { + log.Errorf("Error shutting down metrics server: %s", err) + } + }() + + if tlsm != nil { + // Shutdown tls server + wg.Add(1) + go func() { + defer wg.Done() + tlsm.Close() + }() + } + + // Shutdown tcp server + wg.Add(1) + go func() { + defer wg.Done() + tcpm.Close() + }() + + c := make(chan struct{}) + // This goroutine will wait for all servers to conclude the shutdown + // process + go func() { + defer close(c) + wg.Wait() + }() + + select { + case <-c: + log.Info("All servers were gracefully shutdown. Exiting...") + case <-sCtx.Done(): + log.Warn("Graceful shutdown timeout. Exiting...") + } + } + a.shutdown = shutdownFunc + signal.Notify(a.stopCh, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + a.available.Store(true) + + select { + case signal := <-a.stopCh: + log.Infof("API Server received signal: %s", signal.String()) + a.terminateRequested.Store(true) + a.receivedSignal.Store(true) + a.shutdown() + case <-ctx.Done(): + log.Infof("API Server: %s", ctx.Err()) + a.terminateRequested.Store(true) + a.shutdown() + } } func (a *ArgoCDServer) Initialized() bool { return a.projInformer.HasSynced() && a.appInformer.HasSynced() } +// TerminateRequested returns whether a shutdown was initiated by a signal or context cancel +// as opposed to a watch. +func (a *ArgoCDServer) TerminateRequested() bool { + return a.terminateRequested.Load() +} + // checkServeErr checks the error from a .Serve() call to decide if it was a graceful shutdown func (a *ArgoCDServer) checkServeErr(name string, err error) { - if err != nil { - if a.stopCh == nil { - // a nil stopCh indicates a graceful shutdown - log.Infof("graceful shutdown %s: %v", name, err) - } else { - log.Fatalf("%s: %v", name, err) - } + if err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Errorf("Error received from server %s: %v", name, err) } else { - log.Infof("graceful shutdown %s", name) - } -} - -// Shutdown stops the Argo CD server -func (a *ArgoCDServer) Shutdown() { - log.Info("Shut down requested") - stopCh := a.stopCh - a.stopCh = nil - if stopCh != nil { - close(stopCh) + log.Infof("Graceful shutdown of %s initiated", name) } } @@ -734,7 +833,7 @@ func (a *ArgoCDServer) watchSettings() { } } log.Info("shutting down settings watch") - a.Shutdown() + a.shutdown() a.settingsMgr.Unsubscribe(updateCh) close(updateCh) } diff --git a/server/server_test.go b/server/server_test.go index 1f715d00d4e918..838f9405de2f74 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -10,6 +10,8 @@ import ( "os" "path/filepath" "strings" + gosync "sync" + "syscall" "testing" "time" @@ -419,6 +421,73 @@ func TestCertsAreNotGeneratedInInsecureMode(t *testing.T) { assert.Nil(t, s.settings.Certificate) } +func TestGracefulShutdown(t *testing.T) { + port, err := test.GetFreePort() + require.NoError(t, err) + mockRepoClient := &mocks.Clientset{RepoServerServiceClient: &mocks.RepoServerServiceClient{}} + kubeclientset := fake.NewSimpleClientset(test.NewFakeConfigMap(), test.NewFakeSecret()) + redis, redisCloser := test.NewInMemoryRedis() + defer redisCloser() + s := NewServer( + context.Background(), + ArgoCDServerOpts{ + ListenPort: port, + Namespace: test.FakeArgoCDNamespace, + KubeClientset: kubeclientset, + AppClientset: apps.NewSimpleClientset(), + RepoClientset: mockRepoClient, + RedisClient: redis, + }, + ApplicationSetOpts{}, + ) + + projInformerCancel := test.StartInformer(s.projInformer) + defer projInformerCancel() + appInformerCancel := test.StartInformer(s.appInformer) + defer appInformerCancel() + appsetInformerCancel := test.StartInformer(s.appsetInformer) + defer appsetInformerCancel() + + lns, err := s.Listen() + require.NoError(t, err) + + shutdown := false + runCtx, runCancel := context.WithTimeout(context.Background(), 2*time.Second) + defer runCancel() + + err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}}) + require.Error(t, err, "API Server is not running. It either hasn't started or is restarting.") + + var wg gosync.WaitGroup + wg.Add(1) + go func(shutdown *bool) { + defer wg.Done() + s.Run(runCtx, lns) + *shutdown = true + }(&shutdown) + + for { + if s.available.Load() { + err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}}) + require.NoError(t, err) + break + } + time.Sleep(10 * time.Millisecond) + } + + s.stopCh <- syscall.SIGINT + + wg.Wait() + + err = s.healthCheck(&http.Request{URL: &url.URL{Path: "/healthz", RawQuery: "full=true"}}) + require.Error(t, err, "API Server is terminating and unable to serve requests.") + + assert.True(t, s.terminateRequested.Load()) + assert.True(t, s.receivedSignal.Load()) + assert.False(t, s.available.Load()) + assert.True(t, shutdown) +} + func TestAuthenticate(t *testing.T) { type testData struct { test string diff --git a/test/e2e/aa_graceful_restart_test.go b/test/e2e/aa_graceful_restart_test.go new file mode 100644 index 00000000000000..dd5eba7fa8cea3 --- /dev/null +++ b/test/e2e/aa_graceful_restart_test.go @@ -0,0 +1,27 @@ +package e2e + +import ( + "io" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/argoproj/argo-cd/v2/test/e2e/fixture" + . "github.com/argoproj/argo-cd/v2/test/e2e/fixture" +) + +func TestAPIServerGracefulRestart(t *testing.T) { + EnsureCleanState(t) + + resp, err := DoHttpRequest("GET", "/healthz?full=true", "") + require.NoError(t, err) + responseData, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, "ok\n", string(responseData)) + fixture.RestartAPIServer() + resp, err = DoHttpRequest("GET", "/healthz?full=true", "") + require.NoError(t, err) + responseData, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, "ok\n", string(responseData)) +} diff --git a/test/e2e/fixture/fixture.go b/test/e2e/fixture/fixture.go index f2f366c2012404..c9fbc404512ca9 100644 --- a/test/e2e/fixture/fixture.go +++ b/test/e2e/fixture/fixture.go @@ -942,33 +942,29 @@ func RemoveSubmodule() { // RestartRepoServer performs a restart of the repo server deployment and waits // until the rollout has completed. func RestartRepoServer() { - if IsRemote() { - log.Infof("Waiting for repo server to restart") - prefix := os.Getenv("ARGOCD_E2E_NAME_PREFIX") - workload := "argocd-repo-server" - if prefix != "" { - workload = prefix + "-repo-server" - } - FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "restart", "deployment", workload)) - FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "status", "deployment", workload)) - // wait longer to avoid error on s390x - time.Sleep(10 * time.Second) + log.Infof("Waiting for repo server to restart") + prefix := os.Getenv("ARGOCD_E2E_NAME_PREFIX") + workload := "argocd-repo-server" + if prefix != "" { + workload = prefix + "-repo-server" } + FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "restart", "deployment", workload)) + FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "status", "deployment", workload)) + // wait longer to avoid error on s390x + time.Sleep(10 * time.Second) } // RestartAPIServer performs a restart of the API server deployemt and waits // until the rollout has completed. func RestartAPIServer() { - if IsRemote() { - log.Infof("Waiting for API server to restart") - prefix := os.Getenv("ARGOCD_E2E_NAME_PREFIX") - workload := "argocd-server" - if prefix != "" { - workload = prefix + "-server" - } - FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "restart", "deployment", workload)) - FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "status", "deployment", workload)) + log.Infof("Waiting for API server to restart") + prefix := os.Getenv("ARGOCD_E2E_NAME_PREFIX") + workload := "argocd-server" + if prefix != "" { + workload = prefix + "-server" } + FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "restart", "deployment", workload)) + FailOnErr(Run("", "kubectl", "rollout", "-n", TestNamespace(), "status", "deployment", workload)) } // LocalOrRemotePath selects a path for a given application based on whether