Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use exec probe for queue proxy readiness check #4148

Merged
merged 6 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 60 additions & 4 deletions cmd/queue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"strings"
"time"

"go.uber.org/zap"

"github.com/knative/pkg/logging/logkey"
"github.com/knative/pkg/metrics"
"github.com/knative/pkg/signals"
Expand All @@ -45,7 +43,8 @@ import (
"github.com/knative/serving/pkg/queue"
"github.com/knative/serving/pkg/queue/health"
queuestats "github.com/knative/serving/pkg/queue/stats"

"github.com/pkg/errors"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"
)

Expand All @@ -64,6 +63,10 @@ const (
// in the mesh.
quitSleepDuration = 20 * time.Second

// Set equal to the queue-proxy's ExecProbe timeout to take
// advantage of the full window
probeTimeout = 10 * time.Second
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this a flag so it can be kept consistent with TimeoutSeconds in the probe configuration? Maybe we shouldn't do this until we pass the entire probe descriptor through?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the idea was that this was separate from the issue of passing in the full probe to have something user-defined applied via the queue-proxy. There are a bunch of things in this file that will change once we start passing all of the configuration in.


badProbeTemplate = "unexpected probe header value: %s"
)

Expand Down Expand Up @@ -92,6 +95,8 @@ var (

healthState = &health.State{}
promStatReporter *queue.PrometheusStatsReporter // Prometheus stats reporter.

probe = flag.Bool("probe", false, "run readiness probe")
)

func initEnv() {
Expand Down Expand Up @@ -145,7 +150,7 @@ func knativeProxyHeader(r *http.Request) string {

func probeUserContainer() bool {
var err error
wait.PollImmediate(50*time.Millisecond, 10*time.Second, func() (bool, error) {
wait.PollImmediate(50*time.Millisecond, probeTimeout, func() (bool, error) {
logger.Debug("TCP probing the user-container.")
err = health.TCPProbe(userTargetAddress, 100*time.Millisecond)
return err == nil, nil
Expand Down Expand Up @@ -218,8 +223,59 @@ func createAdminHandlers() *http.ServeMux {
return mux
}

func probeQueueHealthPath(port int) error {
url := fmt.Sprintf("http://127.0.0.1:%d%s", port, queue.RequestQueueHealthPath)

httpClient := &http.Client{
Transport: &http.Transport{
// Do not use the cached connection
DisableKeepAlives: true,
},
Timeout: probeTimeout,
}

var lastErr error

// The 25 millisecond retry interval is an unscientific compromise between wanting to get
// started as early as possible while still wanting to give the container some breathing
// room to get up and running.
timeoutErr := wait.PollImmediate(25*time.Millisecond, probeTimeout, func() (bool, error) {
var res *http.Response
res, lastErr = httpClient.Get(url)

if res == nil {
return false, nil
}

defer res.Body.Close()
return res.StatusCode == http.StatusOK, nil
})

if lastErr != nil {
return errors.Wrap(lastErr, "failed to probe")
}

// An http.StatusOK was never returned during probing
if timeoutErr != nil {
return errors.New("probe returned not ready")
}

return nil
}

func main() {
flag.Parse()

if *probe {
if err := probeQueueHealthPath(networking.QueueAdminPort); err != nil {
// used instead of the logger to produce a concise event message
fmt.Fprintln(os.Stderr, err)
tcnghia marked this conversation as resolved.
Show resolved Hide resolved
os.Exit(1)
}

os.Exit(0)
}

logger, _ = logging.NewLogger(os.Getenv("SERVING_LOGGING_CONFIG"), os.Getenv("SERVING_LOGGING_LEVEL"))
logger = logger.Named("queueproxy")
defer flush(logger)
Expand Down
87 changes: 87 additions & 0 deletions cmd/queue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/url"
"os"
"path"
"strconv"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -154,3 +155,89 @@ func TestCreateVarLogLink(t *testing.T) {
t.Errorf("Incorrect symlink = %q, want %q, diff: %s", got, want, cmp.Diff(got, want))
}
}

func TestProbeQueueConnectionFailure(t *testing.T) {
port := 12345 // some random port (that's not listening)

if err := probeQueueHealthPath(port); err == nil {
t.Error("Expected error, got nil")
}
}

func TestProbeQueueNotReady(t *testing.T) {
queueProbed := false
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
queueProbed = true
w.WriteHeader(http.StatusBadRequest)
}))

defer ts.Close()

portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:")

port, err := strconv.Atoi(portStr)
if err != nil {
t.Fatalf("failed to convert port(%s) to int", portStr)
}

err = probeQueueHealthPath(port)

if diff := cmp.Diff(err.Error(), "probe returned not ready"); diff != "" {
t.Errorf("Unexpected not ready message: %s", diff)
}

if !queueProbed {
t.Errorf("Expected the queue proxy server to be probed")
}
}

func TestProbeQueueReady(t *testing.T) {
queueProbed := false
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
queueProbed = true
w.WriteHeader(http.StatusOK)
}))

defer ts.Close()

portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:")

port, err := strconv.Atoi(portStr)
if err != nil {
t.Fatalf("failed to convert port(%s) to int", portStr)
}

if err = probeQueueHealthPath(port); err != nil {
t.Errorf("probeQueueHealthPath(%d) = %s", port, err)
}

if !queueProbed {
t.Errorf("Expected the queue proxy server to be probed")
}
}

func TestProbeQueueDelayedReady(t *testing.T) {
count := 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if count < 9 {
w.WriteHeader(http.StatusBadRequest)
count++
return
}

w.WriteHeader(http.StatusOK)
}))

defer ts.Close()

portStr := strings.TrimPrefix(ts.URL, "http://127.0.0.1:")

port, err := strconv.Atoi(portStr)
if err != nil {
t.Fatalf("failed to convert port(%s) to int", portStr)
}

if err := probeQueueHealthPath(port); err != nil {
t.Errorf("probeQueueHealthPath(%d) = %s", port, err)
}
}
11 changes: 3 additions & 8 deletions pkg/reconciler/revision/resources/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import (
"math"
"strconv"

"k8s.io/apimachinery/pkg/api/resource"

"k8s.io/apimachinery/pkg/util/intstr"

"github.com/knative/pkg/logging"
pkgmetrics "github.com/knative/pkg/metrics"
"github.com/knative/pkg/system"
Expand All @@ -33,8 +29,8 @@ import (
"github.com/knative/serving/pkg/autoscaler"
"github.com/knative/serving/pkg/deployment"
"github.com/knative/serving/pkg/metrics"
"github.com/knative/serving/pkg/queue"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -63,9 +59,8 @@ var (

queueReadinessProbe = &corev1.Probe{
Handler: corev1.Handler{
HTTPGet: &corev1.HTTPGetAction{
Port: intstr.FromInt(networking.QueueAdminPort),
Path: queue.RequestQueueHealthPath,
Exec: &corev1.ExecAction{
Command: []string{"/ko-app/queue", "-probe", "true"},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you confirm that it's our ultimate goal to:

  1. Pass the user HTTP/TCP probe through for the queue to evaluate
  2. Default the probe to a TCP probe

This seems like a reasonable first step, but I wanted to confirm we have a shared understanding of the end state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is the plan for #4014, but I don't know that this is necessary for that.

The hope was for this to address #3308 (with the possibility of maybe using it in #4014).

},
},
// We want to mark the service as not ready as soon as the
Expand Down