Skip to content

Commit

Permalink
[DCA] Implement apiserver/scheduler/controller-manager liveness checks
Browse files Browse the repository at this point in the history
Previously, the kube_apiserver_controlplane used ComponentStatus to
report control plane components' liveness. This has been deprecated in
[Kubernetes 1.19](kubernetes/kubernetes#93570)
and will be removed at some point in the future.

To remediate that, we're following the recommendation in the deprecation
notice to use the components' own health check endpoints.
  • Loading branch information
juliogreff committed Jul 6, 2021
1 parent b39b28d commit d65c09a
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"context"
"errors"
"fmt"
"net/http"
"os"
"strings"
"time"

Expand Down Expand Up @@ -42,6 +44,7 @@ const (

defaultCacheExpire = 2 * time.Minute
defaultCachePurge = 10 * time.Minute
defaultTimeout = time.Second
)

// KubeASConfig is the config of the API server.
Expand Down Expand Up @@ -181,13 +184,16 @@ func (k *KubeASCheck) Run() error {
}

// Running the Control Plane status check.
componentsStatus, err := k.ac.ComponentStatuses()
if err != nil {
k.Warnf("Could not retrieve the status from the control plane's components %s", err.Error()) //nolint:errcheck
useComponentStatus := false
if useComponentStatus {
err = k.componentStatusCheck(sender)
if err != nil {
k.Warnf("Could not collect control plane status from ComponentStatus: %s", err.Error()) //nolint:errcheck
}
} else {
err = k.parseComponentStatus(sender, componentsStatus)
err = k.controlPlaneHealthCheck(context.TODO(), sender)
if err != nil {
k.Warnf("Could not collect API Server component status: %s", err.Error()) //nolint:errcheck
k.Warnf("Could not collect control plane status from health thecks: %s", err.Error()) //nolint:errcheck
}
}

Expand Down Expand Up @@ -251,15 +257,14 @@ func (k *KubeASCheck) eventCollectionCheck() (newEvents []*v1.Event, err error)

func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsStatus *v1.ComponentStatusList) error {
for _, component := range componentsStatus.Items {

if component.ObjectMeta.Name == "" {
return errors.New("metadata structure has changed. Not collecting API Server's Components status")
}
if component.Conditions == nil || component.Name == "" {
log.Debug("API Server component's structure is not expected")
continue
}
tagComp := []string{fmt.Sprintf("component:%s", component.Name)}

for _, condition := range component.Conditions {
statusCheck := metrics.ServiceCheckUnknown
message := ""
Expand All @@ -269,6 +274,7 @@ func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsS
log.Debugf("Condition %q not supported", condition.Type)
continue
}

// We only expect True, False and Unknown (default).
switch condition.Status {
case "True":
Expand All @@ -277,8 +283,12 @@ func (k *KubeASCheck) parseComponentStatus(sender aggregator.Sender, componentsS
case "False":
statusCheck = metrics.ServiceCheckCritical
message = condition.Error
if message == "" {
message = condition.Message
}
}
sender.ServiceCheck(KubeControlPaneCheck, statusCheck, "", tagComp, message)

sendControlPlaneServiceCheck(sender, statusCheck, component.Name, message)
}
}
return nil
Expand Down Expand Up @@ -316,6 +326,113 @@ func (k *KubeASCheck) processEvents(sender aggregator.Sender, events []*v1.Event
return nil
}

func (k *KubeASCheck) componentStatusCheck(sender aggregator.Sender) error {
componentsStatus, err := k.ac.ComponentStatuses()
if err != nil {
return err
}

err = k.parseComponentStatus(sender, componentsStatus)
if err != nil {
return err
}

return nil
}

func (k *KubeASCheck) controlPlaneHealthCheck(ctx context.Context, sender aggregator.Sender) error {
var (
msg string
status metrics.ServiceCheckStatus
err error
)

// Check controller-manager/scheduler liveness
host := os.Getenv("KUBERNETES_SERVICE_HOST")
healthEndpoints := map[string]string{
"kube-controller-manager": fmt.Sprintf("https://%s:10257/healthz", host),
"kube-scheduler": fmt.Sprintf("https://%s:10259/healthz", host),
}

for component, url := range healthEndpoints {
status, err = queryHealthEndpoint(ctx, url)

if err == nil {
msg = "OK"
} else {
msg = err.Error()
}

sendControlPlaneServiceCheck(sender, status, component, msg)
}

// Check apiserver liveness
client, err := apiserver.GetAPIClient()
if err == nil {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

msg, err = client.GetReadiness(ctx)
if err == nil {
if msg == "ok" {
status = metrics.ServiceCheckOK
} else {
status = metrics.ServiceCheckCritical
}
} else {
msg = err.Error()
status = metrics.ServiceCheckCritical
}
} else {
msg = err.Error()
status = metrics.ServiceCheckUnknown
}

// The ComponentStatus version of this service check used to report
// etcd liveness. There is no recommended way of checking just etcd
// liveness through the API server, so this is really an API server
// liveness check. We also report just "etcd" for backwards
// compatibility.
sendControlPlaneServiceCheck(sender, status, "apiserver", msg)
sendControlPlaneServiceCheck(sender, status, "etcd", msg)

return nil
}

func sendControlPlaneServiceCheck(sender aggregator.Sender, status metrics.ServiceCheckStatus, component, message string) {
tags := []string{fmt.Sprintf("component:%s", component)}
sender.ServiceCheck(KubeControlPaneCheck, status, "", tags, message)
}

func queryHealthEndpoint(ctx context.Context, url string) (metrics.ServiceCheckStatus, error) {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()

req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return metrics.ServiceCheckUnknown, err
}

// TODO(juliogreff): need to figure out how to create a transport :'(
client := &http.Client{
Transport: http.DefaultTransport.(*http.Transport).Clone(),
}

resp, err := client.Do(req)
if err != nil {
return metrics.ServiceCheckCritical, err
}
defer resp.Body.Close()

switch resp.StatusCode {
case http.StatusOK:
return metrics.ServiceCheckOK, nil
default:
err := fmt.Errorf("Health endpoint returned non-OK status: %d", resp.StatusCode)
return metrics.ServiceCheckCritical, err
}
}

// bundleID generates a unique ID to separate k8s events
// based on their InvolvedObject UIDs and event Types
func bundleID(e *v1.Event) string {
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/kubernetes/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,17 @@ func (c *APIClient) GetRESTObject(path string, output runtime.Object) error {
return result.Into(output)
}

// GetReadiness retrieves the API Server readiness status
func (c *APIClient) GetReadiness(ctx context.Context) (string, error) {
path := "/readyz"
content, err := c.Cl.Discovery().RESTClient().Get().AbsPath(path).DoRaw(ctx)
if err != nil {
return "", err
}

return string(content), nil
}

func convertmetadataMapperBundleToAPI(input *metadataMapperBundle) *apiv1.MetadataResponseBundle {
output := apiv1.NewMetadataResponseBundle()
if input == nil {
Expand Down

0 comments on commit d65c09a

Please sign in to comment.