From cafb7aeb4d92b5191aa14c9eed6e57f3b1c2236e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Nussbaumer?= Date: Mon, 22 Jul 2024 16:29:59 +0200 Subject: [PATCH] chore: proper Context handling and simplifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Clément Nussbaumer --- internal/kubenurse/handler_test.go | 3 +- internal/kubenurse/server.go | 25 +++++++++---- internal/kubenurse/server_test.go | 9 +++-- internal/servicecheck/servicecheck.go | 42 ++++++---------------- internal/servicecheck/servicecheck_test.go | 19 ++-------- internal/servicecheck/types.go | 5 +-- main.go | 11 ++---- 7 files changed, 41 insertions(+), 73 deletions(-) diff --git a/internal/kubenurse/handler_test.go b/internal/kubenurse/handler_test.go index c459b0cc..2f586056 100644 --- a/internal/kubenurse/handler_test.go +++ b/internal/kubenurse/handler_test.go @@ -1,7 +1,6 @@ package kubenurse import ( - "context" "net/http" "net/http/httptest" "testing" @@ -14,7 +13,7 @@ func TestServerHandler(t *testing.T) { r := require.New(t) fakeClient := fake.NewFakeClient() - kubenurse, err := New(context.Background(), fakeClient) + kubenurse, err := New(fakeClient) r.NoError(err) r.NotNil(kubenurse) diff --git a/internal/kubenurse/server.go b/internal/kubenurse/server.go index b259eac8..a045fdec 100644 --- a/internal/kubenurse/server.go +++ b/internal/kubenurse/server.go @@ -59,7 +59,7 @@ type Server struct { // * KUBENURSE_CHECK_ME_SERVICE // * KUBENURSE_CHECK_NEIGHBOURHOOD // * KUBENURSE_CHECK_INTERVAL -func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funlen // TODO: use a flag parsing library (e.g. ff) to reduce complexity +func New(c client.Client) (*Server, error) { //nolint:funlen // TODO: use a flag parsing library (e.g. ff) to reduce complexity mux := http.NewServeMux() checkInterval := defaultCheckInterval @@ -134,7 +134,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle } // setup checker - chk, err := servicecheck.New(ctx, c, promRegistry, server.allowUnschedulable, 1*time.Second, histogramBuckets) + chk, err := servicecheck.New(c, promRegistry, server.allowUnschedulable, 1*time.Second, histogramBuckets) if err != nil { return nil, err } @@ -189,7 +189,7 @@ func New(ctx context.Context, c client.Client) (*Server, error) { //nolint:funle } // Run starts the periodic checker and the http/https server(s) and blocks until Shutdown was called. -func (s *Server) Run() error { +func (s *Server) Run(ctx context.Context) error { var ( wg sync.WaitGroup errc = make(chan error, 2) // max two errors can happen @@ -211,7 +211,17 @@ func (s *Server) Run() error { go func() { defer wg.Done() - s.checker.RunScheduled(s.checkInterval) + ticker := time.NewTicker(s.checkInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + s.checker.Run(ctx) + case <-ctx.Done(): + return + } + } }() wg.Add(1) @@ -259,7 +269,7 @@ func (s *Server) Run() error { } // Shutdown disables the readiness probe and then gracefully halts the kubenurse http/https server(s). -func (s *Server) Shutdown(ctx context.Context) error { +func (s *Server) Shutdown() error { s.ready.Store(false) // wait before actually shutting down the http/s server, as the updated @@ -268,8 +278,9 @@ func (s *Server) Shutdown(ctx context.Context) error { // me_ingress or path errors in other pods time.Sleep(s.checker.ShutdownDuration) - // stop the scheduled checker - s.checker.StopScheduled() + // background ctx since, the "root" context is already canceled + ctx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() if err := s.http.Shutdown(ctx); err != nil { return fmt.Errorf("stop http server: %w", err) diff --git a/internal/kubenurse/server_test.go b/internal/kubenurse/server_test.go index 5e38c1a0..130e97ba 100644 --- a/internal/kubenurse/server_test.go +++ b/internal/kubenurse/server_test.go @@ -3,6 +3,7 @@ package kubenurse import ( "context" "testing" + "time" "github.com/stretchr/testify/require" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -12,7 +13,7 @@ func TestCombined(t *testing.T) { r := require.New(t) fakeClient := fake.NewFakeClient() - kubenurse, err := New(context.Background(), fakeClient) + kubenurse, err := New(fakeClient) r.NoError(err) r.NotNil(kubenurse) @@ -20,16 +21,18 @@ func TestCombined(t *testing.T) { r := require.New(t) errc := make(chan error, 1) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) go func() { // blocks until shutdown is called - err := kubenurse.Run() + err := kubenurse.Run(ctx) errc <- err close(errc) + cancel() }() // Shutdown, Run() should stop after function completes - err := kubenurse.Shutdown(context.Background()) + err := kubenurse.Shutdown() r.NoError(err) err = <-errc // blocks until kubenurse.Run() finishes and eventually returns an error diff --git a/internal/servicecheck/servicecheck.go b/internal/servicecheck/servicecheck.go index 83d59617..f70b3445 100644 --- a/internal/servicecheck/servicecheck.go +++ b/internal/servicecheck/servicecheck.go @@ -25,7 +25,7 @@ const ( // New configures the checker with a httpClient and a cache timeout for check // results. Other parameters of the Checker struct need to be configured separately. -func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry, +func New(cl client.Client, promRegistry *prometheus.Registry, allowUnschedulable bool, cacheTTL time.Duration, durationHistogramBuckets []float64) (*Checker, error) { // setup http transport tlsConfig, err := generateTLSConfig(os.Getenv("KUBENURSE_EXTRA_CA")) @@ -63,13 +63,12 @@ func New(_ context.Context, cl client.Client, promRegistry *prometheus.Registry, client: cl, httpClient: httpClient, cacheTTL: cacheTTL, - stop: make(chan struct{}), }, nil } // Run runs all servicechecks and returns the result togeter with a boolean which indicates success. The cache // is respected. -func (c *Checker) Run() { +func (c *Checker) Run(ctx context.Context) { // Run Checks result := sync.Map{} @@ -91,17 +90,17 @@ func (c *Checker) Run() { wg.Add(4) - go c.measure(&wg, &result, c.APIServerDirect, APIServerDirect) - go c.measure(&wg, &result, c.APIServerDNS, APIServerDNS) - go c.measure(&wg, &result, c.MeIngress, meIngress) - go c.measure(&wg, &result, c.MeService, meService) + go c.measure(ctx, &wg, &result, c.APIServerDirect, APIServerDirect) + go c.measure(ctx, &wg, &result, c.APIServerDNS, APIServerDNS) + go c.measure(ctx, &wg, &result, c.MeIngress, meIngress) + go c.measure(ctx, &wg, &result, c.MeService, meService) if c.SkipCheckNeighbourhood { result.Store(NeighbourhoodState, skippedStr) return } - neighbours, err := c.getNeighbours(context.Background(), c.KubenurseNamespace, c.NeighbourFilter) + neighbours, err := c.getNeighbours(ctx, c.KubenurseNamespace, c.NeighbourFilter) if err != nil { result.Store(NeighbourhoodState, err.Error()) return @@ -121,33 +120,12 @@ func (c *Checker) Run() { return c.doRequest(ctx, podIPtoURL(neighbour.PodIP, c.UseTLS), true) } - go c.measure(&wg, &result, check, "path_"+neighbour.NodeName) + go c.measure(ctx, &wg, &result, check, "path_"+neighbour.NodeName) } wg.Wait() } -// RunScheduled runs the checks in the specified interval which can be used to keep the metrics up-to-date. This -// function does not return until StopScheduled is called. -func (c *Checker) RunScheduled(d time.Duration) { - ticker := time.NewTicker(d) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - c.Run() - case <-c.stop: - return - } - } -} - -// StopScheduled is used to stop the scheduled run of checks. -func (c *Checker) StopScheduled() { - close(c.stop) -} - // APIServerDirect checks the /version endpoint of the Kubernetes API Server through the direct link func (c *Checker) APIServerDirect(ctx context.Context) string { if c.SkipCheckAPIServerDirect { @@ -189,12 +167,12 @@ func (c *Checker) MeService(ctx context.Context) string { } // measure implements metric collections for the check -func (c *Checker) measure(wg *sync.WaitGroup, res *sync.Map, check Check, requestType string) { +func (c *Checker) measure(ctx context.Context, wg *sync.WaitGroup, res *sync.Map, check Check, requestType string) { // Add our label (check type) to the context so our http tracer can annotate // metrics and errors based with the label defer wg.Done() - ctx := context.WithValue(context.Background(), kubenurseTypeKey{}, requestType) + ctx = context.WithValue(ctx, kubenurseTypeKey{}, requestType) res.Store(requestType, check(ctx)) } diff --git a/internal/servicecheck/servicecheck_test.go b/internal/servicecheck/servicecheck_test.go index 5f4a7569..8acb4260 100644 --- a/internal/servicecheck/servicecheck_test.go +++ b/internal/servicecheck/servicecheck_test.go @@ -36,29 +36,14 @@ func TestCombined(t *testing.T) { // fake client, with a dummy neighbour pod fakeClient := fake.NewFakeClient(&fakeNeighbourPod) - checker, err := New(context.Background(), fakeClient, prometheus.NewRegistry(), false, 3*time.Second, prometheus.DefBuckets) + checker, err := New(fakeClient, prometheus.NewRegistry(), false, 3*time.Second, prometheus.DefBuckets) r.NoError(err) r.NotNil(checker) t.Run("run", func(t *testing.T) { r := require.New(t) - checker.Run() + checker.Run(context.Background()) r.Equal(okStr, checker.LastCheckResult[NeighbourhoodState]) }) - - t.Run("scheduled", func(t *testing.T) { - stopped := make(chan struct{}) - - go func() { - // blocks until StopScheduled() - checker.RunScheduled(time.Second * 5) - - close(stopped) - }() - - checker.StopScheduled() - - <-stopped - }) } diff --git a/internal/servicecheck/types.go b/internal/servicecheck/types.go index f6f08f6b..5b977284 100644 --- a/internal/servicecheck/types.go +++ b/internal/servicecheck/types.go @@ -25,7 +25,7 @@ type Checker struct { SkipCheckMeIngress bool SkipCheckMeService bool - // shutdownDuration defines the time during which kubenurse will wait before stopping + // shutdownDuration defines the time during which kubenurse will accept https requests during shutdown ShutdownDuration time.Duration // Kubernetes API @@ -55,9 +55,6 @@ type Checker struct { // cacheTTL defines the TTL of how long a cached result is valid cacheTTL time.Duration - - // stop is used to cancel RunScheduled - stop chan struct{} } // Check is the signature used by all checks that the checker can execute. diff --git a/main.go b/main.go index 2fe7c875..3829ded0 100644 --- a/main.go +++ b/main.go @@ -8,7 +8,6 @@ import ( "os" "os/signal" "syscall" - "time" "github.com/postfinance/kubenurse/internal/kubenurse" corev1 "k8s.io/api/core/v1" @@ -62,7 +61,7 @@ func main() { return } - server, err := kubenurse.New(ctx, c) + server, err := kubenurse.New(c) if err != nil { slog.Error("error in kubenurse.New call", "err", err) return @@ -73,17 +72,13 @@ func main() { slog.Info("shutting down, received signal to stop") - // background ctx since, the "root" context is already canceled - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) - defer shutdownCancel() - - if err := server.Shutdown(shutdownCtx); err != nil { + if err := server.Shutdown(); err != nil { slog.Error("error during graceful shutdown", "err", err) } }() // blocks, until the server is stopped by calling Shutdown() - if err := server.Run(); err != nil { + if err := server.Run(ctx); err != nil { slog.Error("error while running kubenurse", "err", err) } }