From 8f6cbeae9806668a4424a7241eb7594c4649943d Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Fri, 17 May 2019 13:39:49 -0400 Subject: [PATCH 1/6] use exec probe for queue proxy readiness check Replaces HTTPGet ReadinessProbe with ExecAction. The ExecAction recreates the HTTPGet semantics within the queue proxy container and hits the same previous health check HTTP path. This approach was taken since the queue proxy process maintains state on whether it's active. --- cmd/queue/main.go | 56 ++++++++++++++++- cmd/queue/main_test.go | 71 ++++++++++++++++++++++ pkg/reconciler/revision/resources/queue.go | 11 +--- 3 files changed, 127 insertions(+), 11 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index ed7b4a1c4d42..0c1232223685 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -18,6 +18,7 @@ package main import ( "context" + "errors" "flag" "fmt" "net/http" @@ -29,8 +30,6 @@ import ( "strings" "time" - "go.uber.org/zap" - "github.com/knative/pkg/logging/logkey" "github.com/knative/pkg/metrics" "github.com/knative/pkg/signals" @@ -45,7 +44,7 @@ import ( "github.com/knative/serving/pkg/queue" "github.com/knative/serving/pkg/queue/health" queuestats "github.com/knative/serving/pkg/queue/stats" - + "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" ) @@ -92,6 +91,8 @@ var ( healthState = &health.State{} promStatReporter *queue.PrometheusStatsReporter // Prometheus stats reporter. + + probe = flag.Bool("probe", false, "run readiness probe") ) func initEnv() { @@ -218,8 +219,57 @@ func createAdminHandlers() *http.ServeMux { return mux } +func probeQueueHealthPath(port string) error { + url := fmt.Sprintf("http://127.0.0.1:%s%s", port, queue.RequestQueueHealthPath) + + httpClient := &http.Client{ + Transport: &http.Transport{ + // Do not use the cached connection + DisableKeepAlives: true, + }, + Timeout: 500 * time.Millisecond, + } + + var lastErr error + + err := wait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + var res *http.Response + res, lastErr = httpClient.Get(url) + + if res == nil { + return false, nil + } + + defer res.Body.Close() + return res.StatusCode == http.StatusOK, nil + }) + + if lastErr != nil { + return fmt.Errorf("failed to probe: %s", lastErr) + } + + // An http.StatusOK was never returned during probing + if err != nil { + return errors.New("probe returned not ready") + } + + return nil +} + func main() { flag.Parse() + + if *probe { + port := strconv.Itoa(networking.QueueAdminPort) + + if err := probeQueueHealthPath(port); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + os.Exit(0) + } + logger, _ = logging.NewLogger(os.Getenv("SERVING_LOGGING_CONFIG"), os.Getenv("SERVING_LOGGING_LEVEL")) logger = logger.Named("queueproxy") defer flush(logger) diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index 6286831ff8a3..b033859a46be 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -154,3 +154,74 @@ func TestCreateVarLogLink(t *testing.T) { t.Errorf("Incorrect symlink = %q, want %q, diff: %s", got, want, cmp.Diff(got, want)) } } + +func TestProbeQueueConnectionFailure(t *testing.T) { + port := "12345" // some random port (that's not listening) + + if err := probeQueueHealthPath(port); err == nil { + t.Error("Expected error, got nil") + } +} + +func TestProbeQueueNotReady(t *testing.T) { + queueProbed := false + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + queueProbed = true + w.WriteHeader(http.StatusBadRequest) + })) + + defer ts.Close() + + port := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + err := probeQueueHealthPath(port) + + if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" { + t.Errorf("Unexpected not ready message: %s", diff) + } + + if !queueProbed { + t.Errorf("Expected the queue proxy server to be probed") + } +} + +func TestProbeQueueReady(t *testing.T) { + queueProbed := false + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + queueProbed = true + w.WriteHeader(http.StatusOK) + })) + + defer ts.Close() + + port := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + + if err := probeQueueHealthPath(port); err != nil { + t.Errorf("probeQueueHealthPath(%s) = %s", port, err) + } + + if !queueProbed { + t.Errorf("Expected the queue proxy server to be probed") + } +} + +func TestProbeQueueDelayedReady(t *testing.T) { + count := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // we expect roughly 10 probes per second + if count < 9 { + w.WriteHeader(http.StatusBadRequest) + count++ + return + } + + w.WriteHeader(http.StatusOK) + })) + + defer ts.Close() + + port := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + + if err := probeQueueHealthPath(port); err != nil { + t.Errorf("probeQueueHealthPath(%s) = %s", port, err) + } +} diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index ab8e097084f0..db021c6beb3f 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -20,10 +20,6 @@ import ( "math" "strconv" - "k8s.io/apimachinery/pkg/api/resource" - - "k8s.io/apimachinery/pkg/util/intstr" - "github.com/knative/pkg/logging" pkgmetrics "github.com/knative/pkg/metrics" "github.com/knative/pkg/system" @@ -33,8 +29,8 @@ import ( "github.com/knative/serving/pkg/autoscaler" "github.com/knative/serving/pkg/deployment" "github.com/knative/serving/pkg/metrics" - "github.com/knative/serving/pkg/queue" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -63,9 +59,8 @@ var ( queueReadinessProbe = &corev1.Probe{ Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Port: intstr.FromInt(networking.QueueAdminPort), - Path: queue.RequestQueueHealthPath, + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "true"}, }, }, // We want to mark the service as not ready as soon as the From fd3445073998b542d89bc05015ab9683b2658a51 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Mon, 27 May 2019 14:47:55 -0400 Subject: [PATCH 2/6] use descriptive name for error, pass port as int to probeQueueHealthPath() --- cmd/queue/main.go | 16 +++++++--------- cmd/queue/main_test.go | 33 +++++++++++++++++++++++++-------- 2 files changed, 32 insertions(+), 17 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 0c1232223685..75e9ed0486b6 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -18,7 +18,6 @@ package main import ( "context" - "errors" "flag" "fmt" "net/http" @@ -44,6 +43,7 @@ import ( "github.com/knative/serving/pkg/queue" "github.com/knative/serving/pkg/queue/health" queuestats "github.com/knative/serving/pkg/queue/stats" + "github.com/pkg/errors" "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" ) @@ -219,8 +219,8 @@ func createAdminHandlers() *http.ServeMux { return mux } -func probeQueueHealthPath(port string) error { - url := fmt.Sprintf("http://127.0.0.1:%s%s", port, queue.RequestQueueHealthPath) +func probeQueueHealthPath(port int) error { + url := fmt.Sprintf("http://127.0.0.1:%d%s", port, queue.RequestQueueHealthPath) httpClient := &http.Client{ Transport: &http.Transport{ @@ -232,7 +232,7 @@ func probeQueueHealthPath(port string) error { var lastErr error - err := wait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + timeoutErr := wait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { var res *http.Response res, lastErr = httpClient.Get(url) @@ -245,11 +245,11 @@ func probeQueueHealthPath(port string) error { }) if lastErr != nil { - return fmt.Errorf("failed to probe: %s", lastErr) + return errors.Wrap(lastErr, "failed to probe") } // An http.StatusOK was never returned during probing - if err != nil { + if timeoutErr != nil { return errors.New("probe returned not ready") } @@ -260,9 +260,7 @@ func main() { flag.Parse() if *probe { - port := strconv.Itoa(networking.QueueAdminPort) - - if err := probeQueueHealthPath(port); err != nil { + if err := probeQueueHealthPath(networking.QueueAdminPort); err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) } diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index b033859a46be..8e4f58667343 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -24,6 +24,7 @@ import ( "net/url" "os" "path" + "strconv" "strings" "testing" "time" @@ -156,7 +157,7 @@ func TestCreateVarLogLink(t *testing.T) { } func TestProbeQueueConnectionFailure(t *testing.T) { - port := "12345" // some random port (that's not listening) + port := 12345 // some random port (that's not listening) if err := probeQueueHealthPath(port); err == nil { t.Error("Expected error, got nil") @@ -172,8 +173,14 @@ func TestProbeQueueNotReady(t *testing.T) { defer ts.Close() - port := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") - err := probeQueueHealthPath(port) + portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("failed to convert port(%s) to int", portStr) + } + + err = probeQueueHealthPath(port) if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" { t.Errorf("Unexpected not ready message: %s", diff) @@ -193,10 +200,15 @@ func TestProbeQueueReady(t *testing.T) { defer ts.Close() - port := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") - if err := probeQueueHealthPath(port); err != nil { - t.Errorf("probeQueueHealthPath(%s) = %s", port, err) + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("failed to convert port(%s) to int", portStr) + } + + if err = probeQueueHealthPath(port); err != nil { + t.Errorf("probeQueueHealthPath(%d) = %s", port, err) } if !queueProbed { @@ -219,9 +231,14 @@ func TestProbeQueueDelayedReady(t *testing.T) { defer ts.Close() - port := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("failed to convert port(%s) to int", portStr) + } if err := probeQueueHealthPath(port); err != nil { - t.Errorf("probeQueueHealthPath(%s) = %s", port, err) + t.Errorf("probeQueueHealthPath(%d) = %s", port, err) } } From bf6cd5e5c226dd790fe1fb2f33c03802594719ac Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Wed, 29 May 2019 17:45:14 -0400 Subject: [PATCH 3/6] pull probe timeout into constant, tighten up retry interval, clean up imports --- cmd/queue/main.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index 75e9ed0486b6..af6d498365d8 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -63,6 +63,10 @@ const ( // in the mesh. quitSleepDuration = 20 * time.Second + // Set equal to the queue-proxy's ExecProbe timeout to take + // advantage of the full window + queueProbeTimeout = 10 * time.Second + badProbeTemplate = "unexpected probe header value: %s" ) @@ -227,12 +231,12 @@ func probeQueueHealthPath(port int) error { // Do not use the cached connection DisableKeepAlives: true, }, - Timeout: 500 * time.Millisecond, + Timeout: queueProbeTimeout, } var lastErr error - timeoutErr := wait.PollImmediate(100*time.Millisecond, time.Second, func() (bool, error) { + timeoutErr := wait.PollImmediate(25*time.Millisecond, queueProbeTimeout, func() (bool, error) { var res *http.Response res, lastErr = httpClient.Get(url) @@ -261,6 +265,7 @@ func main() { if *probe { if err := probeQueueHealthPath(networking.QueueAdminPort); err != nil { + // used instead of the logger to produce a concise event message fmt.Fprintln(os.Stderr, err) os.Exit(1) } From fdeb347866d315fdb59fff21ea82feb10acb5d37 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Thu, 30 May 2019 10:09:58 -0400 Subject: [PATCH 4/6] add comment explaining queue probe retry interval --- cmd/queue/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index af6d498365d8..c0632fbc67a3 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -236,6 +236,9 @@ func probeQueueHealthPath(port int) error { var lastErr error + // The 25 millisecond retry interval is an unscientific compromise between wanting to get + // started as early as possible while still wanting to give the container some breathing + // room to get up and running. timeoutErr := wait.PollImmediate(25*time.Millisecond, queueProbeTimeout, func() (bool, error) { var res *http.Response res, lastErr = httpClient.Get(url) From 1828a41d13710d55acb93cfc050b7f792a9bdc25 Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Thu, 30 May 2019 10:44:50 -0400 Subject: [PATCH 5/6] remove out of date comment --- cmd/queue/main_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index 8e4f58667343..649775044c17 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -219,7 +219,6 @@ func TestProbeQueueReady(t *testing.T) { func TestProbeQueueDelayedReady(t *testing.T) { count := 0 ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // we expect roughly 10 probes per second if count < 9 { w.WriteHeader(http.StatusBadRequest) count++ From 75df79335c9d049924df29e546a55870793a7a1c Mon Sep 17 00:00:00 2001 From: Joshua Rider Date: Fri, 31 May 2019 09:42:10 -0400 Subject: [PATCH 6/6] rename queueProbeTimeout, use constant in probeUserContainer --- cmd/queue/main.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/queue/main.go b/cmd/queue/main.go index c0632fbc67a3..3270f2cc5752 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -65,7 +65,7 @@ const ( // Set equal to the queue-proxy's ExecProbe timeout to take // advantage of the full window - queueProbeTimeout = 10 * time.Second + probeTimeout = 10 * time.Second badProbeTemplate = "unexpected probe header value: %s" ) @@ -150,7 +150,7 @@ func knativeProxyHeader(r *http.Request) string { func probeUserContainer() bool { var err error - wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) { + wait.PollImmediate(50*time.Millisecond, probeTimeout, func() (bool, error) { logger.Debug("TCP probing the user-container.") err = health.TCPProbe(userTargetAddress, 100*time.Millisecond) return err == nil, nil @@ -231,7 +231,7 @@ func probeQueueHealthPath(port int) error { // Do not use the cached connection DisableKeepAlives: true, }, - Timeout: queueProbeTimeout, + Timeout: probeTimeout, } var lastErr error @@ -239,7 +239,7 @@ func probeQueueHealthPath(port int) error { // The 25 millisecond retry interval is an unscientific compromise between wanting to get // started as early as possible while still wanting to give the container some breathing // room to get up and running. - timeoutErr := wait.PollImmediate(25*time.Millisecond, queueProbeTimeout, func() (bool, error) { + timeoutErr := wait.PollImmediate(25*time.Millisecond, probeTimeout, func() (bool, error) { var res *http.Response res, lastErr = httpClient.Get(url)