diff --git a/config/core/configmaps/deployment.yaml b/config/core/configmaps/deployment.yaml index c95559df3cea..37472b983f7d 100644 --- a/config/core/configmaps/deployment.yaml +++ b/config/core/configmaps/deployment.yaml @@ -22,7 +22,7 @@ metadata: app.kubernetes.io/component: controller app.kubernetes.io/version: devel annotations: - knative.dev/example-checksum: "ac007ed2" + knative.dev/example-checksum: "410041a0" data: # This is the Go import path for the binary that is containerized # and substituted here. @@ -88,26 +88,3 @@ data: # Sets rootCA for the queue proxy - used by QPOptions # If omitted, or empty, no rootCA is added to the golang rootCAs queue-sidecar-rootca: "" - - # The freezer service endpoint that queue-proxy calls when its traffic drops to zero or - # scales up from zero. - # - # Freezer service is available at: https://github.com/knative-sandbox/container-freezer - # or users may write their own service. - # - # The value will need to include both the host and the port that will be accessed. - # For the host, $HOST_IP can be passed, and the appropriate host IP value will be swapped - # in at runtime, which will enable the freezer daemonset to be reachable via the node IP. - # - # As an example: - # concurrency-state-endpoint: "http://$HOST_IP:9696" - # - # If not set, queue proxy takes no action (this is the default behavior). - # - # When enabled, a serviceAccountToken will be mounted to queue-proxy using - # a projected volume. This requires the Service Account Token Volume Projection feature - # to be enabled. For details, see this link: - # https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-token-volume-projection - # - # NOTE THAT THIS IS AN EXPERIMENTAL / ALPHA FEATURE - concurrency-state-endpoint: "" diff --git a/pkg/deployment/config.go b/pkg/deployment/config.go index 18284d3681e9..c2014195ad09 100644 --- a/pkg/deployment/config.go +++ b/pkg/deployment/config.go @@ -68,9 +68,6 @@ const ( // qpoptions queueSidecarTokenAudiencesKey = "queue-sidecar-token-audiences" queueSidecarRooCAKey = "queue-sidecar-rootca" - - // concurrencyStateEndpointKey is the key to configure the endpoint Queue Proxy will call when traffic drops to / increases from zero. - concurrencyStateEndpointKey = "concurrency-state-endpoint" ) var ( @@ -86,7 +83,6 @@ func defaultConfig() *Config { DigestResolutionTimeout: digestResolutionTimeoutDefault, RegistriesSkippingTagResolving: sets.NewString("kind.local", "ko.local", "dev.local"), QueueSidecarCPURequest: &QueueSidecarCPURequestDefault, - ConcurrencyStateEndpoint: "", } // The following code is needed for ConfigMap testing. // defaultConfig must match the example in deployment.yaml which includes: `queue-sidecar-token-audiences: ""` @@ -113,7 +109,6 @@ func NewConfigFromMap(configMap map[string]string) (*Config, error) { cm.AsQuantity("queueSidecarCPULimit", &nc.QueueSidecarCPULimit), cm.AsQuantity("queueSidecarMemoryLimit", &nc.QueueSidecarMemoryLimit), cm.AsQuantity("queueSidecarEphemeralStorageLimit", &nc.QueueSidecarEphemeralStorageLimit), - cm.AsString("concurrencyStateEndpoint", &nc.ConcurrencyStateEndpoint), cm.AsString(QueueSidecarImageKey, &nc.QueueSidecarImage), cm.AsDuration(ProgressDeadlineKey, &nc.ProgressDeadline), @@ -129,8 +124,6 @@ func NewConfigFromMap(configMap map[string]string) (*Config, error) { cm.AsStringSet(queueSidecarTokenAudiencesKey, &nc.QueueSidecarTokenAudiences), cm.AsString(queueSidecarRooCAKey, &nc.QueueSidecarRootCA), - - cm.AsString(concurrencyStateEndpointKey, &nc.ConcurrencyStateEndpoint), ); err != nil { return nil, err } @@ -201,7 +194,4 @@ type Config struct { // QueueSidecarRootCA is a root certificate to be trusted by the queue proxy sidecar qpoptions. QueueSidecarRootCA string - - // ConcurrencyStateEndpoint is the endpoint Queue Proxy will call when traffic drops to / increases from zero. - ConcurrencyStateEndpoint string } diff --git a/pkg/deployment/config_test.go b/pkg/deployment/config_test.go index 9714b5ff9eb9..ed2108422ded 100644 --- a/pkg/deployment/config_test.go +++ b/pkg/deployment/config_test.go @@ -200,21 +200,6 @@ func TestControllerConfiguration(t *testing.T) { QueueSidecarImageKey: defaultSidecarImage, ProgressDeadlineKey: "1982ms", }, - }, { - name: "controller configuration with concurrency state endpoint", - wantConfig: &Config{ - RegistriesSkippingTagResolving: sets.NewString("kind.local", "ko.local", "dev.local"), - DigestResolutionTimeout: digestResolutionTimeoutDefault, - QueueSidecarImage: defaultSidecarImage, - QueueSidecarCPURequest: &QueueSidecarCPURequestDefault, - ProgressDeadline: ProgressDeadlineDefault, - QueueSidecarTokenAudiences: sets.NewString(""), - ConcurrencyStateEndpoint: "freeze-proxy", - }, - data: map[string]string{ - QueueSidecarImageKey: defaultSidecarImage, - concurrencyStateEndpointKey: "freeze-proxy", - }, }, { name: "legacy keys supported", data: map[string]string{ @@ -229,7 +214,6 @@ func TestControllerConfiguration(t *testing.T) { "queueSidecarMemoryLimit": "8M", "queueSidecarEphemeralStorageRequest": "9M", "queueSidecarEphemeralStorageLimit": "10M", - "concurrencyStateEndpoint": "11", }, wantConfig: &Config{ QueueSidecarImage: "1", @@ -242,7 +226,6 @@ func TestControllerConfiguration(t *testing.T) { QueueSidecarMemoryLimit: quantity("8M"), QueueSidecarEphemeralStorageRequest: quantity("9M"), QueueSidecarEphemeralStorageLimit: quantity("10M"), - ConcurrencyStateEndpoint: "11", QueueSidecarTokenAudiences: sets.NewString(""), }, }, { @@ -260,7 +243,6 @@ func TestControllerConfiguration(t *testing.T) { "queueSidecarEphemeralStorageRequest": "9M", "queueSidecarEphemeralStorageLimit": "10M", "queueSidecarTokens": "bar", - "concurrencyStateEndpoint": "11", QueueSidecarImageKey: "12", ProgressDeadlineKey: "13s", @@ -273,7 +255,6 @@ func TestControllerConfiguration(t *testing.T) { queueSidecarEphemeralStorageRequestKey: "20M", queueSidecarEphemeralStorageLimitKey: "21M", queueSidecarTokenAudiencesKey: "foo", - concurrencyStateEndpointKey: "22", }, wantConfig: &Config{ QueueSidecarImage: "12", @@ -287,7 +268,6 @@ func TestControllerConfiguration(t *testing.T) { QueueSidecarEphemeralStorageRequest: quantity("20M"), QueueSidecarEphemeralStorageLimit: quantity("21M"), QueueSidecarTokenAudiences: sets.NewString("foo"), - ConcurrencyStateEndpoint: "22", }, }} diff --git a/pkg/queue/concurrency_state.go b/pkg/queue/concurrency_state.go deleted file mode 100644 index 5a3f1aeb7950..000000000000 --- a/pkg/queue/concurrency_state.go +++ /dev/null @@ -1,172 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package queue - -import ( - "bytes" - "fmt" - "net/http" - "os" - "sync" - "time" - - "k8s.io/apimachinery/pkg/util/wait" - - "go.uber.org/atomic" - "go.uber.org/zap" -) - -// ConcurrencyStateHandler tracks the in flight requests for the pod. When the requests -// drop to zero, it runs the `pause` function, and when requests scale up from zero, it -// runs the `resume` function. If either of `pause` or `resume` are not passed, it runs -// the respective local function(s). The local functions are the expected behavior; the -// function parameters are enabled primarily for testing purposes. -func ConcurrencyStateHandler(logger *zap.SugaredLogger, h http.Handler, pause, resume func(*zap.SugaredLogger)) http.HandlerFunc { - var ( - inFlight = atomic.NewInt64(0) - paused = true - mux sync.RWMutex - ) - - return func(w http.ResponseWriter, r *http.Request) { - defer func() { - if inFlight.Dec() == 0 { - mux.Lock() - defer mux.Unlock() - // We need to doublecheck this since another request can have reached the - // handler meanwhile. We don't want to do anything in that case. - if !paused && inFlight.Load() == 0 { - logger.Debug("Requests dropped to zero") - pause(logger) - paused = true - logger.Debug("To-Zero request successfully processed") - } - } - }() - - inFlight.Inc() - - mux.RLock() - if !paused { - // General stable-state case. - defer mux.RUnlock() - h.ServeHTTP(w, r) - return - } - mux.RUnlock() - mux.Lock() - if !paused { // doubly-checked locking - // Another request raced us and resumed already. - defer mux.Unlock() - h.ServeHTTP(w, r) - return - } - - logger.Debug("Requests increased from zero") - resume(logger) - paused = false - logger.Debug("From-Zero request successfully processed") - mux.Unlock() - - h.ServeHTTP(w, r) - } -} - -// retryRequest takes a request function and repeatedly tries it for a specified period -// of time or until successful. If the request ultimately fails, it kills the container. -func retryRequest(logger *zap.SugaredLogger, requestHandler func(string) error, request string) { - var errReq error - retryFunc := func() (bool, error) { - errReq = requestHandler(request) - if errReq != nil { - logger.Errorw(fmt.Sprintf("%s request failed", request), zap.Error(errReq)) - } - return errReq == nil, nil - } - if err := wait.PollImmediate(time.Millisecond*200, 15*time.Minute, retryFunc); err != nil { - logger.Fatalw(fmt.Sprintf("%s request failed", request), zap.Error(err)) - os.Exit(1) - } -} - -type ConcurrencyEndpoint struct { - endpoint string - mountPath string - token atomic.Value -} - -func NewConcurrencyEndpoint(e, m string) *ConcurrencyEndpoint { - c := ConcurrencyEndpoint{ - endpoint: os.Expand(e, func(s string) string { - if s == "HOST_IP" { - return os.Getenv("HOST_IP") - } - return "$" + s // to not change what the user provides - }), - mountPath: m, - } - c.RefreshToken() - return &c -} - -// Pause freezes a container, retrying until either successful or a timeout is -// reached, at which point the container is killed -func (c *ConcurrencyEndpoint) Pause(logger *zap.SugaredLogger) { - retryRequest(logger, c.Request, "pause") -} - -// Resume thaws a container, retrying until either successful or a timeout is -// reached, at which point the container is killed -func (c *ConcurrencyEndpoint) Resume(logger *zap.SugaredLogger) { - retryRequest(logger, c.Request, "resume") -} - -func (c *ConcurrencyEndpoint) Request(action string) error { - bodyText := fmt.Sprintf(`{ "action": %q }`, action) - body := bytes.NewBufferString(bodyText) - req, err := http.NewRequest(http.MethodPost, c.endpoint, body) - if err != nil { - return fmt.Errorf("unable to create request: %w", err) - } - token := fmt.Sprint(c.token.Load()) - req.Header.Add("Token", token) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("unable to post request: %w", err) - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("expected 200 response, got: %d: %s", resp.StatusCode, resp.Status) - } - return nil -} - -func (c *ConcurrencyEndpoint) Terminating(logger *zap.SugaredLogger) { - retryRequest(logger, c.Request, "resume") -} - -func (c *ConcurrencyEndpoint) RefreshToken() error { - token, err := os.ReadFile(c.mountPath) - if err != nil { - return fmt.Errorf("could not read token: %w", err) - } - c.token.Store(string(token)) - return nil -} - -func (c *ConcurrencyEndpoint) Endpoint() string { - return c.endpoint -} diff --git a/pkg/queue/concurrency_state_test.go b/pkg/queue/concurrency_state_test.go deleted file mode 100644 index 1386128ee339..000000000000 --- a/pkg/queue/concurrency_state_test.go +++ /dev/null @@ -1,404 +0,0 @@ -/* -Copyright 2021 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package queue - -import ( - "encoding/json" - "net/http" - "net/http/httptest" - "os" - "path/filepath" - "testing" - "time" - - "go.uber.org/atomic" - "go.uber.org/zap" - pkglogging "knative.dev/pkg/logging" - ltesting "knative.dev/pkg/logging/testing" - - netheader "knative.dev/networking/pkg/http/header" - netstats "knative.dev/networking/pkg/http/stats" -) - -func TestConcurrencyStateHandler(t *testing.T) { - paused := atomic.NewInt64(0) - resumed := atomic.NewInt64(0) - - handler := func(w http.ResponseWriter, r *http.Request) {} - logger := ltesting.TestLogger(t) - h := ConcurrencyStateHandler(logger, http.HandlerFunc(handler), func(*zap.SugaredLogger) { paused.Inc() }, func(*zap.SugaredLogger) { resumed.Inc() }) - - h.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "http://target", nil)) - if got, want := paused.Load(), int64(1); got != want { - t.Errorf("Pause was called %d times, want %d times", got, want) - } - - if got, want := resumed.Load(), int64(1); got != want { - t.Errorf("Resume was called %d times, want %d times", got, want) - } - - h.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "http://target", nil)) - if got, want := paused.Load(), int64(2); got != want { - t.Errorf("Pause was called %d times, want %d times", got, want) - } - - if got, want := resumed.Load(), int64(2); got != want { - t.Errorf("Resume was called %d times, want %d times", got, want) - } -} - -func TestConcurrencyStateHandlerParallelSubsumed(t *testing.T) { - paused := atomic.NewInt64(0) - resumed := atomic.NewInt64(0) - - req1 := make(chan struct{}) - handler := func(w http.ResponseWriter, r *http.Request) { - if r.URL.Query().Get("req") == "1" { - req1 <- struct{}{} // to know it's here. - req1 <- struct{}{} // to make it wait. - } - } - logger := ltesting.TestLogger(t) - h := ConcurrencyStateHandler(logger, http.HandlerFunc(handler), func(*zap.SugaredLogger) { paused.Inc() }, func(*zap.SugaredLogger) { resumed.Inc() }) - - go func() { - defer func() { req1 <- struct{}{} }() - h.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "http://target?req=1", nil)) - }() - - <-req1 // Wait for req1 to arrive. - - // Send a second request, which can pass through. - h.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "http://target", nil)) - - <-req1 // Allow req1 to pass. - <-req1 // Wait for req1 to finish. - - if got, want := paused.Load(), int64(1); got != want { - t.Errorf("Pause was called %d times, want %d times", got, want) - } - - if got, want := resumed.Load(), int64(1); got != want { - t.Errorf("Resume was called %d times, want %d times", got, want) - } -} - -func TestConcurrencyStateHandlerParallelOverlapping(t *testing.T) { - paused := atomic.NewInt64(0) - resumed := atomic.NewInt64(0) - - req1 := make(chan struct{}) - req2 := make(chan struct{}) - handler := func(w http.ResponseWriter, r *http.Request) { - if r.URL.Query().Get("req") == "1" { - req1 <- struct{}{} // to know it's here. - req1 <- struct{}{} // to make it wait. - } else { - req2 <- struct{}{} // to know it's here. - req2 <- struct{}{} // to make it wait. - } - } - logger := ltesting.TestLogger(t) - h := ConcurrencyStateHandler(logger, http.HandlerFunc(handler), func(*zap.SugaredLogger) { paused.Inc() }, func(*zap.SugaredLogger) { resumed.Inc() }) - - go func() { - defer func() { req1 <- struct{}{} }() - h.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "http://target?req=1", nil)) - }() - - <-req1 // Wait for req1 to arrive. - - go func() { - defer func() { req2 <- struct{}{} }() - h.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "http://target?req=2", nil)) - }() - - <-req2 // Wait for req2 to arrive - - <-req1 // Allow req1 to pass. - <-req1 // Wait for req1 to finish. - - <-req2 // Allow req2 to pass. - <-req2 // Wait for req2 to finish. - - if got, want := paused.Load(), int64(1); got != want { - t.Errorf("Pause was called %d times, want %d times", got, want) - } - - if got, want := resumed.Load(), int64(1); got != want { - t.Errorf("Resume was called %d times, want %d times", got, want) - } -} - -func TestConcurrencyStateTokenRefresh(t *testing.T) { - var token string - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - tk := r.Header.Get("Token") - if tk != token { - t.Errorf("incorrect token header, expected %s, got %s", token, tk) - } - })) - tokenPath := filepath.Join(t.TempDir(), "secret") - token = "0123456789" - if err := os.WriteFile(tokenPath, []byte(token), 0700); err != nil { - t.Fatal(err) - } - - c := NewConcurrencyEndpoint(ts.URL, tokenPath) - if err := c.Request("pause"); err != nil { - t.Errorf("initial token check returned an error: %s", err) - } - - token = "abcdefghijklmnop" - if err := os.WriteFile(tokenPath, []byte(token), 0700); err != nil { - t.Fatal(err) - } - c.RefreshToken() - if err := c.Request("pause"); err != nil { - t.Errorf("updated token check returned an error: %s", err) - } -} - -func TestConcurrencyStateEndpoint(t *testing.T) { - hostIP := "123.4.56.789" - os.Setenv("HOST_IP", hostIP) - - tokenPath := filepath.Join(t.TempDir(), "secret") - if err := os.WriteFile(tokenPath, []byte("0123456789"), 0700); err != nil { - t.Fatal(err) - } - - // no substitution - endpoint := "http://test:1234" - c := NewConcurrencyEndpoint(endpoint, tokenPath) - if c.endpoint != endpoint { - t.Errorf("expected %s, got %s", endpoint, c.Endpoint()) - } - - // hostIP substitution - endpoint = "http://$HOST_IP:1234" - subEndpoint := "http://" + hostIP + ":1234" - c = NewConcurrencyEndpoint(endpoint, tokenPath) - if c.endpoint != subEndpoint { - t.Errorf("expected %s, got %s", subEndpoint, c.endpoint) - } - - // hostIP and no port - endpoint = "http://$HOST_IP" - c = NewConcurrencyEndpoint(endpoint, tokenPath) - if c.endpoint != "http://"+hostIP { - t.Errorf("expected %s, got %s", endpoint, c.Endpoint()) - } - - // non-hostIP not substituted - endpoint = "http://$SERVING_NAMESPACE:1234" - c = NewConcurrencyEndpoint(endpoint, tokenPath) - if c.endpoint != endpoint { - t.Errorf("expected %s, got %s", endpoint, c.endpoint) - } -} - -func TestConcurrencyStatePauseHeader(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - token := r.Header.Get("Token") - if token != "0123456789" { - t.Errorf("incorrect token header, expected '0123456789', got %s", token) - } - })) - - tokenPath := filepath.Join(t.TempDir(), "secret") - if err := os.WriteFile(tokenPath, []byte("0123456789"), 0700); err != nil { - t.Fatal(err) - } - c := NewConcurrencyEndpoint(ts.URL, tokenPath) - if err := c.Request("pause"); err != nil { - t.Errorf("pause header check returned an error: %s", err) - } -} - -func TestConcurrencyStatePauseRequest(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - var m struct{ Action string } - err := json.NewDecoder(r.Body).Decode(&m) - if err != nil { - t.Errorf("unable to parse message body: %s", err) - } - if m.Action != "pause" { - t.Errorf("improper message body, expected 'pause' and got: %s", m.Action) - } - })) - - tokenPath := filepath.Join(t.TempDir(), "secret") - if err := os.WriteFile(tokenPath, []byte("0123456789"), 0700); err != nil { - t.Fatal(err) - } - c := NewConcurrencyEndpoint(ts.URL, tokenPath) - if err := c.Request("pause"); err != nil { - t.Errorf("request test returned an error: %s", err) - } -} - -func TestConcurrencyStateBadResponse(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusBadRequest) - })) - defer ts.Close() - - tokenPath := filepath.Join(t.TempDir(), "secret") - if err := os.WriteFile(tokenPath, []byte("0123456789"), 0700); err != nil { - t.Fatal(err) - } - c := NewConcurrencyEndpoint(ts.URL, tokenPath) - if err := c.Request("pause"); err == nil { - t.Errorf(`Request("pause") function did not return an error`) - } - if err := c.Request("resume"); err == nil { - t.Errorf(`Request("resume") function did not return an error`) - } -} - -func TestConcurrencyStateResumeHeader(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - token := r.Header.Get("Token") - if token != "0123456789" { - t.Errorf("incorrect token header, expected '0123456789', got %s", token) - } - })) - - tokenPath := filepath.Join(t.TempDir(), "secret") - if err := os.WriteFile(tokenPath, []byte("0123456789"), 0700); err != nil { - t.Fatal(err) - } - c := NewConcurrencyEndpoint(ts.URL, tokenPath) - if err := c.Request("resume"); err != nil { - t.Errorf("resume header check returned an error: %s", err) - } -} - -func TestConcurrencyStateResumeRequest(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) - var m struct{ Action string } - err := json.NewDecoder(r.Body).Decode(&m) - if err != nil { - t.Errorf("unable to parse message body: %s", err) - } - if m.Action != "resume" { - t.Errorf("improper message body, expected 'resume' and got: %s", m.Action) - } - })) - - tokenPath := filepath.Join(t.TempDir(), "secret") - if err := os.WriteFile(tokenPath, []byte("0123456789"), 0700); err != nil { - t.Fatal(err) - } - c := NewConcurrencyEndpoint(ts.URL, tokenPath) - if err := c.Request("resume"); err != nil { - t.Errorf("request test returned an error: %s", err) - } -} - -func TestConcurrencyStateErrorRetryOperation(t *testing.T) { - reqCnt := 0 - ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if reqCnt >= 2 { - w.WriteHeader(http.StatusOK) - } else { - w.WriteHeader(http.StatusBadRequest) - } - reqCnt++ - })) - defer ts.Close() - - tokenPath := filepath.Join(t.TempDir(), "secret") - if err := os.WriteFile(tokenPath, []byte("0123456789"), 0700); err != nil { - t.Fatal(err) - } - c := NewConcurrencyEndpoint(ts.URL, tokenPath) - handler := func(w http.ResponseWriter, r *http.Request) {} - logger := ltesting.TestLogger(t) - h := ConcurrencyStateHandler(logger, http.HandlerFunc(handler), c.Pause, c.Resume) - - timeNow := time.Now() - h.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest("GET", "http://target", nil)) - timeAfter := time.Now() - // why reqCnt is 4: when calling h.ServeHTTP, the server function will call resume, then call pause. - // it will call resume 3 times to make retry successful because of condition (reqCnt >= 2), then call - // pause, so it's 4 times in together. - // why time cost can't be less than 400ms: when the first resume failed, it will retry 2 times again, so the time cost - // is time interval multiplied by 2. - if timeAfter.Sub(timeNow) < (time.Millisecond*400) || reqCnt != 4 { - t.Errorf("fail to retry correct times") - } -} - -func BenchmarkConcurrencyStateProxyHandler(b *testing.B) { - logger, _ := pkglogging.NewLogger("", "error") - baseHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}) - stats := netstats.NewRequestStats(time.Now()) - - req := httptest.NewRequest(http.MethodPost, "http://example.com", nil) - req.Header.Set(netheader.OriginalHostKey, wantHost) - - tests := []struct { - label string - breaker *Breaker - reportPeriod time.Duration - }{{ - label: "breaker-10-no-reports", - breaker: NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}), - reportPeriod: time.Hour, - }, { - label: "breaker-infinite-no-reports", - breaker: nil, - reportPeriod: time.Hour, - }, { - label: "breaker-10-many-reports", - breaker: NewBreaker(BreakerParams{QueueDepth: 10, MaxConcurrency: 10, InitialCapacity: 10}), - reportPeriod: time.Microsecond, - }, { - label: "breaker-infinite-many-reports", - breaker: nil, - reportPeriod: time.Microsecond, - }} - - for _, tc := range tests { - reportTicker := time.NewTicker(tc.reportPeriod) - - pause := func(*zap.SugaredLogger) {} - resume := func(*zap.SugaredLogger) {} - - h := ConcurrencyStateHandler(logger, ProxyHandler(tc.breaker, stats, true /*tracingEnabled*/, baseHandler), pause, resume) - b.Run("sequential-"+tc.label, func(b *testing.B) { - resp := httptest.NewRecorder() - for j := 0; j < b.N; j++ { - h(resp, req) - } - }) - b.Run("parallel-"+tc.label, func(b *testing.B) { - b.RunParallel(func(pb *testing.PB) { - resp := httptest.NewRecorder() - for pb.Next() { - h(resp, req) - } - }) - }) - - reportTicker.Stop() - } -} diff --git a/pkg/queue/constants.go b/pkg/queue/constants.go index 2e405ce78a7d..e2c104b34377 100644 --- a/pkg/queue/constants.go +++ b/pkg/queue/constants.go @@ -37,9 +37,6 @@ const ( // TokenDirectory is the name of the directory path where tokens are stored. TokenDirectory = "/var/run/secrets/tokens" - // ConcurrencyStateTokenFilename is the file name of the concurency state in TokenDirectory. - ConcurrencyStateTokenFilename = "state-token" - // PodInfoAnnotationsFilename is the file name of the annotations in PodInfoDirectory. PodInfoAnnotationsFilename = "annotations" ) diff --git a/pkg/queue/sharedmain/handlers.go b/pkg/queue/sharedmain/handlers.go index 231ec9bf0622..ce925c5243a8 100644 --- a/pkg/queue/sharedmain/handlers.go +++ b/pkg/queue/sharedmain/handlers.go @@ -43,7 +43,6 @@ func mainHandler( prober func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, - ce *queue.ConcurrencyEndpoint, ) (http.Handler, *pkghandler.Drainer) { target := net.JoinHostPort("127.0.0.1", env.UserPort) @@ -55,7 +54,6 @@ func mainHandler( breaker := buildBreaker(logger, env) tracingEnabled := env.TracingConfigBackend != tracingconfig.None - concurrencyStateEnabled := env.ConcurrencyStateEndpoint != "" timeout := time.Duration(env.RevisionTimeoutSeconds) * time.Second var responseStartTimeout = 0 * time.Second if env.RevisionResponseStartTimeoutSeconds != 0 { @@ -68,17 +66,6 @@ func mainHandler( // Create queue handler chain. // Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first. var composedHandler http.Handler = httpProxy - if concurrencyStateEnabled { - logger.Info("Concurrency state endpoint set, tracking request counts, using endpoint: ", ce.Endpoint()) - go func() { - for range time.NewTicker(1 * time.Minute).C { - ce.RefreshToken() - } - }() - composedHandler = queue.ConcurrencyStateHandler(logger, composedHandler, ce.Pause, ce.Resume) - // start paused - ce.Pause(logger) - } metricsSupported := supportsMetrics(ctx, logger, env) if metricsSupported { diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index a98e44ca6a8a..e5fedffe755c 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -103,10 +103,6 @@ type config struct { TracingConfigSampleRate float64 `split_words:"true"` // optional TracingConfigZipkinEndpoint string `split_words:"true"` // optional - // Concurrency State Endpoint configuration - ConcurrencyStateEndpoint string `split_words:"true"` // optional - ConcurrencyStateTokenPath string `split_words:"true"` // optional - Env } @@ -224,22 +220,15 @@ func Main(opts ...Option) error { }() // Setup probe to run for checking user-application healthiness. - // Do not set up probe if concurrency state endpoint is set, as - // paused containers don't play well with k8s readiness probes. probe := func() bool { return true } - if env.ServingReadinessProbe != "" && env.ConcurrencyStateEndpoint == "" { + if env.ServingReadinessProbe != "" { probe = buildProbe(logger, env.ServingReadinessProbe, env.EnableHTTP2AutoDetection).ProbeContainer } - var concurrencyendpoint *queue.ConcurrencyEndpoint - if env.ConcurrencyStateEndpoint != "" { - concurrencyendpoint = queue.NewConcurrencyEndpoint(env.ConcurrencyStateEndpoint, env.ConcurrencyStateTokenPath) - } - // Enable TLS when certificate is mounted. tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) - mainHandler, drainer := mainHandler(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint) + mainHandler, drainer := mainHandler(d.Ctx, env, d.Transport, probe, stats, logger) adminHandler := adminHandler(d.Ctx, logger, drainer) // Enable TLS server when activator server certs are mounted. @@ -297,9 +286,6 @@ func Main(opts ...Option) error { logger.Errorw("Failed to bring up queue-proxy, shutting down.", zap.Error(err)) return err case <-d.Ctx.Done(): - if env.ConcurrencyStateEndpoint != "" { - concurrencyendpoint.Terminating(logger) - } logger.Info("Received TERM signal, attempting to gracefully shutdown servers.") logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drainer.Drain() diff --git a/pkg/reconciler/revision/resources/deploy.go b/pkg/reconciler/revision/resources/deploy.go index b2aec046792e..636ee50fa92e 100644 --- a/pkg/reconciler/revision/resources/deploy.go +++ b/pkg/reconciler/revision/resources/deploy.go @@ -43,7 +43,6 @@ import ( ) const certVolumeName = "server-certs" -const concurrencyStateHook = "concurrency-state-hook" var ( varLogVolume = corev1.Volume{ @@ -179,12 +178,6 @@ func makePodSpec(rev *v1.Revision, cfg *config.Config) (*corev1.PodSpec, error) extraVolumes = append(extraVolumes, varPodInfoVolume) } - // If concurrencyStateEndpoint is enabled, add the serviceAccountToken to QP via a projected volume - if cfg.Deployment.ConcurrencyStateEndpoint != "" { - // add token for audience "concurrency-state-hook" under filename ConcurrencyStateTokenFilename - addToken(tokenVolume, queue.ConcurrencyStateTokenFilename, concurrencyStateHook, ptr.Int64(600)) - } - audiences := make([]string, 0, len(cfg.Deployment.QueueSidecarTokenAudiences)) for k := range cfg.Deployment.QueueSidecarTokenAudiences { audiences = append(audiences, k) diff --git a/pkg/reconciler/revision/resources/deploy_test.go b/pkg/reconciler/revision/resources/deploy_test.go index cce8d106c341..2b27f3c07dc5 100644 --- a/pkg/reconciler/revision/resources/deploy_test.go +++ b/pkg/reconciler/revision/resources/deploy_test.go @@ -177,12 +177,6 @@ var ( }, { Name: "METRICS_COLLECTOR_ADDRESS", Value: "", - }, { - Name: "CONCURRENCY_STATE_ENDPOINT", - Value: "", - }, { - Name: "CONCURRENCY_STATE_TOKEN_PATH", - Value: "/var/run/secrets/tokens/state-token", }, { Name: "HOST_IP", ValueFrom: &corev1.EnvVarSource{ @@ -1343,41 +1337,6 @@ func TestMakePodSpec(t *testing.T) { ), }, ), - }, { - name: "concurrency state projected volume", - dc: deployment.Config{ - ConcurrencyStateEndpoint: "freeze-proxy", - }, - rev: revision("bar", "foo", - withContainers([]corev1.Container{{ - Name: servingContainerName, - Image: "busybox", - ReadinessProbe: withTCPReadinessProbe(v1.DefaultUserPort), - Ports: buildContainerPorts(v1.DefaultUserPort), - }}), - WithContainerStatuses([]v1.ContainerStatus{{ - ImageDigest: "busybox@sha256:deadbeef", - }, { - ImageDigest: "ubuntu@sha256:deadbeef", - }}), - ), - want: podSpec( - []corev1.Container{ - servingContainer(func(container *corev1.Container) { - container.Image = "busybox@sha256:deadbeef" - }), - queueContainer(func(container *corev1.Container) { - container.VolumeMounts = []corev1.VolumeMount{{ - Name: varTokenVolume.Name, - MountPath: "/var/run/secrets/tokens", - }} - }, - withEnvVar("CONCURRENCY_STATE_ENDPOINT", `freeze-proxy`), - withEnvVar("CONCURRENCY_STATE_TOKEN_PATH", `/var/run/secrets/tokens/state-token`), - ), - }, - withAppendedTokenVolumes([]appendTokenVolume{{filename: queue.ConcurrencyStateTokenFilename, audience: concurrencyStateHook, expires: 600}}), - ), }} for _, test := range tests { diff --git a/pkg/reconciler/revision/resources/queue.go b/pkg/reconciler/revision/resources/queue.go index 0f53f8640bf5..7a5bc8d10cf1 100644 --- a/pkg/reconciler/revision/resources/queue.go +++ b/pkg/reconciler/revision/resources/queue.go @@ -19,7 +19,6 @@ package resources import ( "fmt" "math" - "path" "strconv" corev1 "k8s.io/api/core/v1" @@ -359,12 +358,6 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container }, { Name: "METRICS_COLLECTOR_ADDRESS", Value: cfg.Observability.MetricsCollectorAddress, - }, { - Name: "CONCURRENCY_STATE_ENDPOINT", - Value: cfg.Deployment.ConcurrencyStateEndpoint, - }, { - Name: "CONCURRENCY_STATE_TOKEN_PATH", - Value: path.Join(queue.TokenDirectory, queue.ConcurrencyStateTokenFilename), }, { Name: "HOST_IP", ValueFrom: &corev1.EnvVarSource{ diff --git a/pkg/reconciler/revision/resources/queue_test.go b/pkg/reconciler/revision/resources/queue_test.go index 8a85abf78ca7..dd0b1c51db77 100644 --- a/pkg/reconciler/revision/resources/queue_test.go +++ b/pkg/reconciler/revision/resources/queue_test.go @@ -370,19 +370,6 @@ func TestMakeQueueContainer(t *testing.T) { "ENABLE_HTTP2_AUTO_DETECTION": "true", }) }), - }, { - name: "set concurrency state endpoint", - rev: revision("bar", "foo", - withContainers(containers)), - dc: deployment.Config{ - ConcurrencyStateEndpoint: "freeze-proxy", - }, - want: queueContainer(func(c *corev1.Container) { - c.Env = env(map[string]string{ - "CONCURRENCY_STATE_ENDPOINT": "freeze-proxy", - "CONCURRENCY_STATE_TOKEN_PATH": "/var/run/secrets/tokens/state-token", - }) - }), }, { name: "set root ca", rev: revision("bar", "foo", @@ -916,8 +903,6 @@ func TestTCPProbeGeneration(t *testing.T) { } var defaultEnv = map[string]string{ - "CONCURRENCY_STATE_ENDPOINT": "", - "CONCURRENCY_STATE_TOKEN_PATH": "/var/run/secrets/tokens/state-token", "CONTAINER_CONCURRENCY": "0", "ENABLE_HTTP2_AUTO_DETECTION": "false", "ENABLE_PROFILING": "false",