From d65c09a444776e3c35fac5dc4ec70647bc685dc7 Mon Sep 17 00:00:00 2001 From: Julio Greff Date: Mon, 5 Jul 2021 16:15:23 +0200 Subject: [PATCH] [DCA] Implement apiserver/scheduler/controller-manager liveness checks Previously, the kube_apiserver_controlplane used ComponentStatus to report control plane components' liveness. This has been deprecated in [Kubernetes 1.19](https://github.com/kubernetes/kubernetes/pull/93570) and will be removed at some point in the future. To remediate that, we're following the recommendation in the deprecation notice to use the components' own health check endpoints. --- .../kubernetes_apiserver.go | 133 ++++++++++++++++-- pkg/util/kubernetes/apiserver/apiserver.go | 11 ++ 2 files changed, 136 insertions(+), 8 deletions(-) diff --git a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go index 6dbef110443b2..6a3fe758851f6 100644 --- a/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go +++ b/pkg/collector/corechecks/cluster/kubernetesapiserver/kubernetes_apiserver.go @@ -11,6 +11,8 @@ import ( "context" "errors" "fmt" + "net/http" + "os" "strings" "time" @@ -42,6 +44,7 @@ const ( defaultCacheExpire = 2 * time.Minute defaultCachePurge = 10 * time.Minute + defaultTimeout = time.Second ) // KubeASConfig is the config of the API server. @@ -181,13 +184,16 @@ func (k *KubeASCheck) Run() error { } // Running the Control Plane status check. - componentsStatus, err := k.ac.ComponentStatuses() - if err != nil { - k.Warnf("Could not retrieve the status from the control plane's components %s", err.Error()) //nolint:errcheck + useComponentStatus := false + if useComponentStatus { + err = k.componentStatusCheck(sender) + if err != nil { + k.Warnf("Could not collect control plane status from ComponentStatus: %s", err.Error()) //nolint:errcheck + } } else { - err = k.parseComponentStatus(sender, componentsStatus) + err = k.controlPlaneHealthCheck(context.TODO(), sender) if err != nil { - k.Warnf("Could not collect API Server component status: %s", err.Error()) //nolint:errcheck + k.Warnf("Could not collect control plane status from health thecks: %s", err.Error()) //nolint:errcheck } } @@ -251,7 +257,6 @@ func (k *KubeASCheck) eventCollectionCheck() (newEvents []*v1.Event, err error) func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsStatus *v1.ComponentStatusList) error { for _, component := range componentsStatus.Items { - if component.ObjectMeta.Name == "" { return errors.New("metadata structure has changed. Not collecting API Server's Components status") } @@ -259,7 +264,7 @@ func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsS log.Debug("API Server component's structure is not expected") continue } - tagComp := []string{fmt.Sprintf("component:%s", component.Name)} + for _, condition := range component.Conditions { statusCheck := metrics.ServiceCheckUnknown message := "" @@ -269,6 +274,7 @@ func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsS log.Debugf("Condition %q not supported", condition.Type) continue } + // We only expect True, False and Unknown (default). switch condition.Status { case "True": @@ -277,8 +283,12 @@ func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsS case "False": statusCheck = metrics.ServiceCheckCritical message = condition.Error + if message == "" { + message = condition.Message + } } - sender.ServiceCheck(KubeControlPaneCheck, statusCheck, "", tagComp, message) + + sendControlPlaneServiceCheck(sender, statusCheck, component.Name, message) } } return nil @@ -316,6 +326,113 @@ func (k *KubeASCheck) processEvents(sender aggregator.Sender, events []*v1.Event return nil } +func (k *KubeASCheck) componentStatusCheck(sender aggregator.Sender) error { + componentsStatus, err := k.ac.ComponentStatuses() + if err != nil { + return err + } + + err = k.parseComponentStatus(sender, componentsStatus) + if err != nil { + return err + } + + return nil +} + +func (k *KubeASCheck) controlPlaneHealthCheck(ctx context.Context, sender aggregator.Sender) error { + var ( + msg string + status metrics.ServiceCheckStatus + err error + ) + + // Check controller-manager/scheduler liveness + host := os.Getenv("KUBERNETES_SERVICE_HOST") + healthEndpoints := map[string]string{ + "kube-controller-manager": fmt.Sprintf("https://%s:10257/healthz", host), + "kube-scheduler": fmt.Sprintf("https://%s:10259/healthz", host), + } + + for component, url := range healthEndpoints { + status, err = queryHealthEndpoint(ctx, url) + + if err == nil { + msg = "OK" + } else { + msg = err.Error() + } + + sendControlPlaneServiceCheck(sender, status, component, msg) + } + + // Check apiserver liveness + client, err := apiserver.GetAPIClient() + if err == nil { + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + msg, err = client.GetReadiness(ctx) + if err == nil { + if msg == "ok" { + status = metrics.ServiceCheckOK + } else { + status = metrics.ServiceCheckCritical + } + } else { + msg = err.Error() + status = metrics.ServiceCheckCritical + } + } else { + msg = err.Error() + status = metrics.ServiceCheckUnknown + } + + // The ComponentStatus version of this service check used to report + // etcd liveness. There is no recommended way of checking just etcd + // liveness through the API server, so this is really an API server + // liveness check. We also report just "etcd" for backwards + // compatibility. + sendControlPlaneServiceCheck(sender, status, "apiserver", msg) + sendControlPlaneServiceCheck(sender, status, "etcd", msg) + + return nil +} + +func sendControlPlaneServiceCheck(sender aggregator.Sender, status metrics.ServiceCheckStatus, component, message string) { + tags := []string{fmt.Sprintf("component:%s", component)} + sender.ServiceCheck(KubeControlPaneCheck, status, "", tags, message) +} + +func queryHealthEndpoint(ctx context.Context, url string) (metrics.ServiceCheckStatus, error) { + ctx, cancel := context.WithTimeout(ctx, defaultTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return metrics.ServiceCheckUnknown, err + } + + // TODO(juliogreff): need to figure out how to create a transport :'( + client := &http.Client{ + Transport: http.DefaultTransport.(*http.Transport).Clone(), + } + + resp, err := client.Do(req) + if err != nil { + return metrics.ServiceCheckCritical, err + } + defer resp.Body.Close() + + switch resp.StatusCode { + case http.StatusOK: + return metrics.ServiceCheckOK, nil + default: + err := fmt.Errorf("Health endpoint returned non-OK status: %d", resp.StatusCode) + return metrics.ServiceCheckCritical, err + } +} + // bundleID generates a unique ID to separate k8s events // based on their InvolvedObject UIDs and event Types func bundleID(e *v1.Event) string { diff --git a/pkg/util/kubernetes/apiserver/apiserver.go b/pkg/util/kubernetes/apiserver/apiserver.go index df02d0a4b1046..ae32eedbbd7bb 100644 --- a/pkg/util/kubernetes/apiserver/apiserver.go +++ b/pkg/util/kubernetes/apiserver/apiserver.go @@ -608,6 +608,17 @@ func (c *APIClient) GetRESTObject(path string, output runtime.Object) error { return result.Into(output) } +// GetReadiness retrieves the API Server readiness status +func (c *APIClient) GetReadiness(ctx context.Context) (string, error) { + path := "/readyz" + content, err := c.Cl.Discovery().RESTClient().Get().AbsPath(path).DoRaw(ctx) + if err != nil { + return "", err + } + + return string(content), nil +} + func convertmetadataMapperBundleToAPI(input *metadataMapperBundle) *apiv1.MetadataResponseBundle { output := apiv1.NewMetadataResponseBundle() if input == nil {