Skip to content

Commit

Permalink
use exec probe for queue proxy readiness check (knative#4148)
Browse files Browse the repository at this point in the history
* use exec probe for queue proxy readiness check

Replaces HTTPGet ReadinessProbe with ExecAction. The ExecAction
recreates the HTTPGet semantics within the queue proxy container and hits
the same previous health check HTTP path. This approach was taken
since the queue proxy process maintains state on whether it's active.

* use descriptive name for error, pass port as int to probeQueueHealthPath()

* pull probe timeout into constant, tighten up retry interval, clean up imports

* add comment explaining queue probe retry interval

* remove out of date comment

* rename queueProbeTimeout, use constant in probeUserContainer
  • Loading branch information
joshrider authored and hohaichi committed Jun 25, 2019
1 parent 23199de commit 3b5d141
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 12 deletions.
64 changes: 60 additions & 4 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"strings"
"time"

"go.uber.org/zap"

"github.com/knative/pkg/logging/logkey"
"github.com/knative/pkg/metrics"
"github.com/knative/pkg/signals"
Expand All @@ -45,7 +43,8 @@ import (
"github.com/knative/serving/pkg/queue"
"github.com/knative/serving/pkg/queue/health"
queuestats "github.com/knative/serving/pkg/queue/stats"

"github.com/pkg/errors"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
)

Expand All @@ -64,6 +63,10 @@ const (
// in the mesh.
quitSleepDuration = 20 * time.Second

// Set equal to the queue-proxy's ExecProbe timeout to take
// advantage of the full window
probeTimeout = 10 * time.Second

badProbeTemplate = "unexpected probe header value: %s"
)

Expand Down Expand Up @@ -92,6 +95,8 @@ var (

healthState = &health.State{}
promStatReporter *queue.PrometheusStatsReporter // Prometheus stats reporter.

probe = flag.Bool("probe", false, "run readiness probe")
)

func initEnv() {
Expand Down Expand Up @@ -145,7 +150,7 @@ func knativeProxyHeader(r *http.Request) string {

func probeUserContainer() bool {
var err error
wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) {
wait.PollImmediate(50*time.Millisecond, probeTimeout, func() (bool, error) {
logger.Debug("TCP probing the user-container.")
err = health.TCPProbe(userTargetAddress, 100*time.Millisecond)
return err == nil, nil
Expand Down Expand Up @@ -218,8 +223,59 @@ func createAdminHandlers() *http.ServeMux {
return mux
}

func probeQueueHealthPath(port int) error {
url := fmt.Sprintf("http://127.0.0.1:%d%s", port, queue.RequestQueueHealthPath)

httpClient := &http.Client{
Transport: &http.Transport{
// Do not use the cached connection
DisableKeepAlives: true,
},
Timeout: probeTimeout,
}

var lastErr error

// The 25 millisecond retry interval is an unscientific compromise between wanting to get
// started as early as possible while still wanting to give the container some breathing
// room to get up and running.
timeoutErr := wait.PollImmediate(25*time.Millisecond, probeTimeout, func() (bool, error) {
var res *http.Response
res, lastErr = httpClient.Get(url)

if res == nil {
return false, nil
}

defer res.Body.Close()
return res.StatusCode == http.StatusOK, nil
})

if lastErr != nil {
return errors.Wrap(lastErr, "failed to probe")
}

// An http.StatusOK was never returned during probing
if timeoutErr != nil {
return errors.New("probe returned not ready")
}

return nil
}

func main() {
flag.Parse()

if *probe {
if err := probeQueueHealthPath(networking.QueueAdminPort); err != nil {
// used instead of the logger to produce a concise event message
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

os.Exit(0)
}

logger, _ = logging.NewLogger(os.Getenv("SERVING_LOGGING_CONFIG"), os.Getenv("SERVING_LOGGING_LEVEL"))
logger = logger.Named("queueproxy")
defer flush(logger)
Expand Down
87 changes: 87 additions & 0 deletions cmd/queue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -154,3 +155,89 @@ func TestCreateVarLogLink(t *testing.T) {
t.Errorf("Incorrect symlink = %q, want %q, diff: %s", got, want, cmp.Diff(got, want))
}
}

func TestProbeQueueConnectionFailure(t *testing.T) {
port := 12345 // some random port (that's not listening)

if err := probeQueueHealthPath(port); err == nil {
t.Error("Expected error, got nil")
}
}

func TestProbeQueueNotReady(t *testing.T) {
queueProbed := false
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
queueProbed = true
w.WriteHeader(http.StatusBadRequest)
}))

defer ts.Close()

portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:")

port, err := strconv.Atoi(portStr)
if err != nil {
t.Fatalf("failed to convert port(%s) to int", portStr)
}

err = probeQueueHealthPath(port)

if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" {
t.Errorf("Unexpected not ready message: %s", diff)
}

if !queueProbed {
t.Errorf("Expected the queue proxy server to be probed")
}
}

func TestProbeQueueReady(t *testing.T) {
queueProbed := false
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
queueProbed = true
w.WriteHeader(http.StatusOK)
}))

defer ts.Close()

portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:")

port, err := strconv.Atoi(portStr)
if err != nil {
t.Fatalf("failed to convert port(%s) to int", portStr)
}

if err = probeQueueHealthPath(port); err != nil {
t.Errorf("probeQueueHealthPath(%d) = %s", port, err)
}

if !queueProbed {
t.Errorf("Expected the queue proxy server to be probed")
}
}

func TestProbeQueueDelayedReady(t *testing.T) {
count := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if count < 9 {
w.WriteHeader(http.StatusBadRequest)
count++
return
}

w.WriteHeader(http.StatusOK)
}))

defer ts.Close()

portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:")

port, err := strconv.Atoi(portStr)
if err != nil {
t.Fatalf("failed to convert port(%s) to int", portStr)
}

if err := probeQueueHealthPath(port); err != nil {
t.Errorf("probeQueueHealthPath(%d) = %s", port, err)
}
}
11 changes: 3 additions & 8 deletions pkg/reconciler/revision/resources/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import (
"math"
"strconv"

"k8s.io/apimachinery/pkg/api/resource"

"k8s.io/apimachinery/pkg/util/intstr"

"github.com/knative/pkg/logging"
pkgmetrics "github.com/knative/pkg/metrics"
"github.com/knative/pkg/ptr"
Expand All @@ -34,8 +30,8 @@ import (
"github.com/knative/serving/pkg/autoscaler"
"github.com/knative/serving/pkg/deployment"
"github.com/knative/serving/pkg/metrics"
"github.com/knative/serving/pkg/queue"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -64,9 +60,8 @@ var (

queueReadinessProbe = &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(networking.QueueAdminPort),
Path: queue.RequestQueueHealthPath,
Exec: &corev1.ExecAction{
Command: []string{"/ko-app/queue", "-probe", "true"},
},
},
// We want to mark the service as not ready as soon as the
Expand Down

0 comments on commit 3b5d141

Please sign in to comment.