Skip to content

Commit

Permalink
[RELEASE-1.11] [SRVKS-661][Backport] Support http1 full duplex per wo…
Browse files Browse the repository at this point in the history
…rkload (knative#14568) (knative#591)

* Support http1 full duplex per workload (knative#14568)

* support http1 full duplex per workload

* lint

* style

* single call

* updates & unit test

* fix unit test

* fix comment

* address review comments

* fix lint

* skip for Mac

* Create less load for TestActivatorChainHandlerWithFullDuplex (knative#14820)

* less load

* limit cons
  • Loading branch information
skonto committed Jan 26, 2024
1 parent 0b7fda1 commit dfc75d4
Show file tree
Hide file tree
Showing 12 changed files with 245 additions and 2 deletions.
3 changes: 2 additions & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,13 @@ func main() {
// NOTE: MetricHandler is being used as the outermost handler of the meaty bits. We're not interested in measuring
// the healthchecks or probes.
ah = activatorhandler.NewMetricHandler(env.PodName, ah)
// We need the context handler to run first so ctx gets the revision info.
ah = activatorhandler.WrapActivatorHandlerWithFullDuplex(ah, logger)
ah = activatorhandler.NewContextHandler(ctx, ah, configStore)

// Network probe handlers.
ah = &activatorhandler.ProbeHandler{NextHandler: ah}
ah = netprobe.NewHandler(ah)

// Set up our health check based on the health of stat sink and environmental factors.
sigCtx := signals.NewContext()
hc := newHealthCheck(sigCtx, logger, statSink)
Expand Down
12 changes: 12 additions & 0 deletions pkg/activator/handler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,15 @@ func RevisionFrom(ctx context.Context) *v1.Revision {
func RevIDFrom(ctx context.Context) types.NamespacedName {
return ctx.Value(revCtxKey{}).(*revCtx).revID
}

func RevAnnotation(ctx context.Context, annotation string) string {
v := ctx.Value(revCtxKey{})
if v == nil {
return ""
}
rev := v.(*revCtx).revision
if rev != nil && rev.GetAnnotations() != nil {
return rev.GetAnnotations()[annotation]
}
return ""
}
14 changes: 14 additions & 0 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"knative.dev/pkg/tracing/propagation/tracecontextb3"
"knative.dev/serving/pkg/activator"
activatorconfig "knative.dev/serving/pkg/activator/config"
apiconfig "knative.dev/serving/pkg/apis/config"
pkghttp "knative.dev/serving/pkg/http"
"knative.dev/serving/pkg/networking"
"knative.dev/serving/pkg/queue"
Expand Down Expand Up @@ -150,3 +151,16 @@ func useSecurePort(target string) string {
target = strings.Split(target, ":")[0]
return target + ":" + strconv.Itoa(networking.BackendHTTPSPort)
}

func WrapActivatorHandlerWithFullDuplex(h http.Handler, logger *zap.SugaredLogger) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
revEnableHTTP1FullDuplex := strings.EqualFold(RevAnnotation(r.Context(), apiconfig.AllowHTTPFullDuplexFeatureKey), "Enabled")
if revEnableHTTP1FullDuplex {
rc := http.NewResponseController(w)
if err := rc.EnableFullDuplex(); err != nil {
logger.Errorw("Unable to enable full duplex", zap.Error(err))
}
}
h.ServeHTTP(w, r)
})
}
166 changes: 166 additions & 0 deletions pkg/activator/handler/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@ package handler

import (
"bytes"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"net/url"
"runtime"
"sync"
"testing"

netprobe "knative.dev/networking/pkg/http/probe"
"knative.dev/pkg/logging"
pkgnet "knative.dev/pkg/network"
rtesting "knative.dev/pkg/reconciler/testing"
"knative.dev/serving/pkg/activator"
apiconfig "knative.dev/serving/pkg/apis/config"
asmetrics "knative.dev/serving/pkg/autoscaler/metrics"
pkghttp "knative.dev/serving/pkg/http"
)
Expand Down Expand Up @@ -114,3 +120,163 @@ func BenchmarkHandlerChain(b *testing.B) {
})
})
}

// TestActivatorChainHandlerWithFullDuplex tests activator's chain handler with the new http1 full duplex support against the issue
// https://github.com/golang/go/issues/40747, where reverse proxy failed with read errors.
// The test uses the reproducer in https://github.com/golang/go/issues/40747#issuecomment-733552061.
// We enable full duplex by setting the annotation `features.knative.dev/http-full-duplex` at the revision level.
func TestActivatorChainHandlerWithFullDuplex(t *testing.T) {
if runtime.GOOS == "darwin" {
t.Skip("Testing this on Mac requires to loosen some restrictions, see https://github.com/knative/serving/pull/14568#issuecomment-1893151202 for more")
}

ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
rev := revision(testNamespace, testRevName)
defer reset()
rev.Annotations = map[string]string{apiconfig.AllowHTTPFullDuplexFeatureKey: "Enabled"}
t.Cleanup(cancel)

logger := logging.FromContext(ctx)
configStore := setupConfigStore(t, logger)
revisionInformer(ctx, rev)

// Buffer equal to the activator.
statCh := make(chan []asmetrics.StatMessage)
concurrencyReporter := NewConcurrencyReporter(ctx, activatorPodName, statCh)
go concurrencyReporter.Run(ctx.Done())

// Just read and ignore all stat messages.
go func() {
for {
select {
case <-statCh:
case <-ctx.Done():
return
}
}
}()

// The server responding with the sent body.
echoServer := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, req *http.Request) {
body, err := io.ReadAll(req.Body)
if err != nil {
log.Printf("error reading body: %v", err)
http.Error(w, fmt.Sprintf("error reading body: %v", err), http.StatusInternalServerError)
return
}

if _, err := w.Write(body); err != nil {
log.Printf("error writing body: %v", err)
}
},
))
defer echoServer.Close()

// The server proxying requests to the echo server.
echoURL, err := url.Parse(echoServer.URL)
if err != nil {
t.Fatalf("Failed to parse echo URL: %v", err)
}

proxy := pkghttp.NewHeaderPruningReverseProxy(echoURL.Host, "", []string{}, false)
proxy.FlushInterval = 0
proxyWithMiddleware := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
proxy.ServeHTTP(w, r)
})
var ah http.Handler
ah = concurrencyReporter.Handler(proxyWithMiddleware)
ah = NewTracingHandler(ah)
ah, _ = pkghttp.NewRequestLogHandler(ah, io.Discard, "", nil, false)
ah = NewMetricHandler(activatorPodName, ah)
ah = WrapActivatorHandlerWithFullDuplex(ah, logger)
ah = NewContextHandler(ctx, ah, configStore)
ah = &ProbeHandler{NextHandler: ah}
ah = netprobe.NewHandler(ah)
ah = &HealthHandler{HealthCheck: func() error { return nil }, NextHandler: ah, Logger: logger}

bodySize := 32 * 1024
parallelism := 32

proxyServer := httptest.NewServer(ah)

defer proxyServer.Close()

transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = 10
transport.MaxIdleConns = 100

// Turning on this will hide the issue
transport.DisableKeepAlives = false
c := &http.Client{
Transport: transport,
}

body := make([]byte, bodySize)
for i := 0; i < cap(body); i++ {
body[i] = 42
}

for i := 0; i < 10; i++ {
var wg sync.WaitGroup
wg.Add(parallelism)
for i := 0; i < parallelism; i++ {
go func(i int) {
defer wg.Done()

for i := 0; i < 100; i++ {
if err := send(c, proxyServer.URL, body, "test-host"); err != nil {
t.Errorf("error during request: %v", err)
}
}
}(i)
}

wg.Wait()
}

}

func send(client *http.Client, url string, body []byte, rHost string) error {
r := bytes.NewBuffer(body)
req, err := http.NewRequest("POST", url, r)

if rHost != "" {
req.Host = rHost
}

req.Header.Set(activator.RevisionHeaderNamespace, testNamespace)
req.Header.Set(activator.RevisionHeaderName, testRevName)

if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to execute request: %w", err)
}
defer resp.Body.Close()

bd := io.Reader(resp.Body)

rec, err := io.ReadAll(bd)

if err != nil {
return fmt.Errorf("failed to read body: %w", err)
}

if _, err = io.Copy(io.Discard, resp.Body); err != nil {
return fmt.Errorf("failed to discard body: %w", err)
}

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

if len(rec) != len(body) {
return fmt.Errorf("unexpected body length: %d", len(rec))
}

return nil
}
3 changes: 3 additions & 0 deletions pkg/apis/config/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ const (

// DryRunFeatureKey gates the podspec dryrun feature and runs with the value 'enabled'
DryRunFeatureKey = "features.knative.dev/podspec-dryrun"

// AllowHTTPFullDuplexFeatureKey gates the use of http1 full duplex per workload
AllowHTTPFullDuplexFeatureKey = "features.knative.dev/http-full-duplex"
)

func defaultFeaturesConfig() *Features {
Expand Down
5 changes: 5 additions & 0 deletions pkg/http/handler/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ type timeoutWriter struct {
var _ http.Flusher = (*timeoutWriter)(nil)
var _ http.ResponseWriter = (*timeoutWriter)(nil)

// Unwrap returns the underlying writer
func (tw *timeoutWriter) Unwrap() http.ResponseWriter {
return tw.w
}

func (tw *timeoutWriter) Flush() {
// The inner handler of timeoutHandler can call Flush at any time including after
// timeoutHandler.ServeHTTP has returned. Forwarding this call to the inner
Expand Down
5 changes: 5 additions & 0 deletions pkg/http/response_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func NewResponseRecorder(w http.ResponseWriter, responseCode int) *ResponseRecor
}
}

// Unwrap returns the underlying writer
func (rr *ResponseRecorder) Unwrap() http.ResponseWriter {
return rr.writer
}

// Flush flushes the buffer to the client.
func (rr *ResponseRecorder) Flush() {
rr.writer.(http.Flusher).Flush()
Expand Down
15 changes: 15 additions & 0 deletions pkg/queue/sharedmain/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func mainHandler(
composedHandler = tracing.HTTPSpanMiddleware(composedHandler)
}

composedHandler = withFullDuplex(composedHandler, env.EnableHTTPFullDuplex, logger)

drainer := &pkghandler.Drainer{
QuietPeriod: drainSleepDuration,
// Add Activator probe header to the drainer so it can handle probes directly from activator
Expand Down Expand Up @@ -124,3 +126,16 @@ func adminHandler(ctx context.Context, logger *zap.SugaredLogger, drainer *pkgha

return mux
}

func withFullDuplex(h http.Handler, enableFullDuplex bool, logger *zap.SugaredLogger) http.Handler {
if !enableFullDuplex {
return h
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rc := http.NewResponseController(w)
if err := rc.EnableFullDuplex(); err != nil {
logger.Errorw("Unable to enable full duplex", zap.Error(err))
}
h.ServeHTTP(w, r)
})
}
4 changes: 3 additions & 1 deletion pkg/queue/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ type config struct {
RevisionIdleTimeoutSeconds int `split_words:"true"` // optional
ServingReadinessProbe string `split_words:"true"` // optional
EnableProfiling bool `split_words:"true"` // optional
EnableHTTP2AutoDetection bool `split_words:"true"` // optional
// See https://github.com/knative/serving/issues/12387
EnableHTTPFullDuplex bool `split_words:"true"` // optional
EnableHTTP2AutoDetection bool `envconfig:"ENABLE_HTTP2_AUTO_DETECTION"` // optional

// Logging configuration
ServingLoggingConfig string `split_words:"true" required:"true"`
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/revision/resources/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ var (
}, {
Name: "ENABLE_HTTP2_AUTO_DETECTION",
Value: "false",
}, {
Name: "ENABLE_HTTP_FULL_DUPLEX",
Value: "false",
}, {
Name: "ROOT_CA",
Value: "",
Expand Down
6 changes: 6 additions & 0 deletions pkg/reconciler/revision/resources/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -300,6 +301,8 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container
}
}

fullDuplexFeature, fullDuplexExists := rev.Annotations[apicfg.AllowHTTPFullDuplexFeatureKey]

useQPResourceDefaults := cfg.Features.QueueProxyResourceDefaults == apicfg.Enabled
c := &corev1.Container{
Name: QueueContainerName,
Expand Down Expand Up @@ -415,6 +418,9 @@ func makeQueueContainer(rev *v1.Revision, cfg *config.Config) (*corev1.Container
}, {
Name: "ENABLE_HTTP2_AUTO_DETECTION",
Value: strconv.FormatBool(cfg.Features.AutoDetectHTTP2 == apicfg.Enabled),
}, {
Name: "ENABLE_HTTP_FULL_DUPLEX",
Value: strconv.FormatBool(fullDuplexExists && strings.EqualFold(fullDuplexFeature, string(apicfg.Enabled))),
}, {
Name: "ROOT_CA",
Value: cfg.Deployment.QueueSidecarRootCA,
Expand Down
11 changes: 11 additions & 0 deletions pkg/reconciler/revision/resources/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,16 @@ func TestMakeQueueContainer(t *testing.T) {
"ENABLE_HTTP2_AUTO_DETECTION": "true",
})
}),
}, {
name: "HTTP1 full duplex enabled",
rev: revision("bar", "foo",
withContainers(containers),
WithRevisionAnnotations(map[string]string{apicfg.AllowHTTPFullDuplexFeatureKey: string(apicfg.Enabled)})),
want: queueContainer(func(c *corev1.Container) {
c.Env = env(map[string]string{
"ENABLE_HTTP_FULL_DUPLEX": "true",
})
}),
}, {
name: "set root ca",
rev: revision("bar", "foo",
Expand Down Expand Up @@ -1041,6 +1051,7 @@ func TestTCPProbeGeneration(t *testing.T) {
var defaultEnv = map[string]string{
"CONTAINER_CONCURRENCY": "0",
"ENABLE_HTTP2_AUTO_DETECTION": "false",
"ENABLE_HTTP_FULL_DUPLEX": "false",
"ENABLE_PROFILING": "false",
"METRICS_DOMAIN": metrics.Domain(),
"METRICS_COLLECTOR_ADDRESS": "",
Expand Down

0 comments on commit dfc75d4

Please sign in to comment.