diff --git a/cmd/queue/main.go b/cmd/queue/main.go index ed7b4a1c4d42..3270f2cc5752 100644 --- a/cmd/queue/main.go +++ b/cmd/queue/main.go @@ -29,8 +29,6 @@ import ( "strings" "time" - "go.uber.org/zap" - "github.com/knative/pkg/logging/logkey" "github.com/knative/pkg/metrics" "github.com/knative/pkg/signals" @@ -45,7 +43,8 @@ import ( "github.com/knative/serving/pkg/queue" "github.com/knative/serving/pkg/queue/health" queuestats "github.com/knative/serving/pkg/queue/stats" - + "github.com/pkg/errors" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/util/wait" ) @@ -64,6 +63,10 @@ const ( // in the mesh. quitSleepDuration = 20 * time.Second + // Set equal to the queue-proxy's ExecProbe timeout to take + // advantage of the full window + probeTimeout = 10 * time.Second + badProbeTemplate = "unexpected probe header value: %s" ) @@ -92,6 +95,8 @@ var ( healthState = &health.State{} promStatReporter *queue.PrometheusStatsReporter // Prometheus stats reporter. + + probe = flag.Bool("probe", false, "run readiness probe") ) func initEnv() { @@ -145,7 +150,7 @@ func knativeProxyHeader(r *http.Request) string { func probeUserContainer() bool { var err error - wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) { + wait.PollImmediate(50*time.Millisecond, probeTimeout, func() (bool, error) { logger.Debug("TCP probing the user-container.") err = health.TCPProbe(userTargetAddress, 100*time.Millisecond) return err == nil, nil @@ -218,8 +223,59 @@ func createAdminHandlers() *http.ServeMux { return mux } +func probeQueueHealthPath(port int) error { + url := fmt.Sprintf("http://127.0.0.1:%d%s", port, queue.RequestQueueHealthPath) + + httpClient := &http.Client{ + Transport: &http.Transport{ + // Do not use the cached connection + DisableKeepAlives: true, + }, + Timeout: probeTimeout, + } + + var lastErr error + + // The 25 millisecond retry interval is an unscientific compromise between wanting to get + // started as early as possible while still wanting to give the container some breathing + // room to get up and running. + timeoutErr := wait.PollImmediate(25*time.Millisecond, probeTimeout, func() (bool, error) { + var res *http.Response + res, lastErr = httpClient.Get(url) + + if res == nil { + return false, nil + } + + defer res.Body.Close() + return res.StatusCode == http.StatusOK, nil + }) + + if lastErr != nil { + return errors.Wrap(lastErr, "failed to probe") + } + + // An http.StatusOK was never returned during probing + if timeoutErr != nil { + return errors.New("probe returned not ready") + } + + return nil +} + func main() { flag.Parse() + + if *probe { + if err := probeQueueHealthPath(networking.QueueAdminPort); err != nil { + // used instead of the logger to produce a concise event message + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } + + os.Exit(0) + } + logger, _ = logging.NewLogger(os.Getenv("SERVING_LOGGING_CONFIG"), os.Getenv("SERVING_LOGGING_LEVEL")) logger = logger.Named("queueproxy") defer flush(logger) diff --git a/cmd/queue/main_test.go b/cmd/queue/main_test.go index 6286831ff8a3..649775044c17 100644 --- a/cmd/queue/main_test.go +++ b/cmd/queue/main_test.go @@ -24,6 +24,7 @@ import ( "net/url" "os" "path" + "strconv" "strings" "testing" "time" @@ -154,3 +155,89 @@ func TestCreateVarLogLink(t *testing.T) { t.Errorf("Incorrect symlink = %q, want %q, diff: %s", got, want, cmp.Diff(got, want)) } } + +func TestProbeQueueConnectionFailure(t *testing.T) { + port := 12345 // some random port (that's not listening) + + if err := probeQueueHealthPath(port); err == nil { + t.Error("Expected error, got nil") + } +} + +func TestProbeQueueNotReady(t *testing.T) { + queueProbed := false + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + queueProbed = true + w.WriteHeader(http.StatusBadRequest) + })) + + defer ts.Close() + + portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("failed to convert port(%s) to int", portStr) + } + + err = probeQueueHealthPath(port) + + if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" { + t.Errorf("Unexpected not ready message: %s", diff) + } + + if !queueProbed { + t.Errorf("Expected the queue proxy server to be probed") + } +} + +func TestProbeQueueReady(t *testing.T) { + queueProbed := false + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + queueProbed = true + w.WriteHeader(http.StatusOK) + })) + + defer ts.Close() + + portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("failed to convert port(%s) to int", portStr) + } + + if err = probeQueueHealthPath(port); err != nil { + t.Errorf("probeQueueHealthPath(%d) = %s", port, err) + } + + if !queueProbed { + t.Errorf("Expected the queue proxy server to be probed") + } +} + +func TestProbeQueueDelayedReady(t *testing.T) { + count := 0 + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if count < 9 { + w.WriteHeader(http.StatusBadRequest) + count++ + return + } + + w.WriteHeader(http.StatusOK) + })) + + defer ts.Close() + + portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:") + + port, err := strconv.Atoi(portStr) + if err != nil { + t.Fatalf("failed to convert port(%s) to int", portStr) + } + + if err := probeQueueHealthPath(port); err != nil { + t.Errorf("probeQueueHealthPath(%d) = %s", port, err) + } +} diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index ab8e097084f0..db021c6beb3f 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -20,10 +20,6 @@ import ( "math" "strconv" - "k8s.io/apimachinery/pkg/api/resource" - - "k8s.io/apimachinery/pkg/util/intstr" - "github.com/knative/pkg/logging" pkgmetrics "github.com/knative/pkg/metrics" "github.com/knative/pkg/system" @@ -33,8 +29,8 @@ import ( "github.com/knative/serving/pkg/autoscaler" "github.com/knative/serving/pkg/deployment" "github.com/knative/serving/pkg/metrics" - "github.com/knative/serving/pkg/queue" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -63,9 +59,8 @@ var ( queueReadinessProbe = &corev1.Probe{ Handler: corev1.Handler{ - HTTPGet: &corev1.HTTPGetAction{ - Port: intstr.FromInt(networking.QueueAdminPort), - Path: queue.RequestQueueHealthPath, + Exec: &corev1.ExecAction{ + Command: []string{"/ko-app/queue", "-probe", "true"}, }, }, // We want to mark the service as not ready as soon as the