diff --git a/cmd/default-domain/main.go b/cmd/default-domain/main.go index 17762695b30b..48c164a81910 100644 --- a/cmd/default-domain/main.go +++ b/cmd/default-domain/main.go @@ -192,7 +192,6 @@ func main() { h := netprobe.NewHandler(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) })) - //nolinlt:gosec https://github.com/knative/serving/issues/13439 server := http.Server{Addr: ":8080", Handler: h, ReadHeaderTimeout: time.Minute} go server.ListenAndServe() diff --git a/pkg/autoscaler/statserver/server.go b/pkg/autoscaler/statserver/server.go index bba9f098c7ec..18a90c1e3633 100644 --- a/pkg/autoscaler/statserver/server.go +++ b/pkg/autoscaler/statserver/server.go @@ -66,11 +66,11 @@ func New(statsServerAddr string, statsCh chan<- metrics.StatMessage, logger *zap mux := http.NewServeMux() mux.HandleFunc("/", svr.Handler) - //nolint:gosec // https://github.com/knative/serving/issues/13439 svr.wsSrv = http.Server{ - Addr: statsServerAddr, - Handler: mux, - ConnState: svr.onConnStateChange, + Addr: statsServerAddr, + Handler: mux, + ConnState: svr.onConnStateChange, + ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 } return &svr } diff --git a/pkg/queue/sharedmain/handlers.go b/pkg/queue/sharedmain/handlers.go new file mode 100644 index 000000000000..231ec9bf0622 --- /dev/null +++ b/pkg/queue/sharedmain/handlers.go @@ -0,0 +1,139 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharedmain + +import ( + "context" + "net" + "net/http" + "time" + + "go.uber.org/zap" + netheader "knative.dev/networking/pkg/http/header" + netproxy "knative.dev/networking/pkg/http/proxy" + netstats "knative.dev/networking/pkg/http/stats" + pkghandler "knative.dev/pkg/network/handlers" + "knative.dev/pkg/tracing" + tracingconfig "knative.dev/pkg/tracing/config" + "knative.dev/serving/pkg/activator" + pkghttp "knative.dev/serving/pkg/http" + "knative.dev/serving/pkg/http/handler" + "knative.dev/serving/pkg/queue" + "knative.dev/serving/pkg/queue/health" +) + +func mainHandler( + ctx context.Context, + env config, + transport http.RoundTripper, + prober func() bool, + stats *netstats.RequestStats, + logger *zap.SugaredLogger, + ce *queue.ConcurrencyEndpoint, +) (http.Handler, *pkghandler.Drainer) { + target := net.JoinHostPort("127.0.0.1", env.UserPort) + + httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */) + httpProxy.Transport = transport + httpProxy.ErrorHandler = pkghandler.Error(logger) + httpProxy.BufferPool = netproxy.NewBufferPool() + httpProxy.FlushInterval = netproxy.FlushInterval + + breaker := buildBreaker(logger, env) + tracingEnabled := env.TracingConfigBackend != tracingconfig.None + concurrencyStateEnabled := env.ConcurrencyStateEndpoint != "" + timeout := time.Duration(env.RevisionTimeoutSeconds) * time.Second + var responseStartTimeout = 0 * time.Second + if env.RevisionResponseStartTimeoutSeconds != 0 { + responseStartTimeout = time.Duration(env.RevisionResponseStartTimeoutSeconds) * time.Second + } + var idleTimeout = 0 * time.Second + if env.RevisionIdleTimeoutSeconds != 0 { + idleTimeout = time.Duration(env.RevisionIdleTimeoutSeconds) * time.Second + } + // Create queue handler chain. + // Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first. + var composedHandler http.Handler = httpProxy + if concurrencyStateEnabled { + logger.Info("Concurrency state endpoint set, tracking request counts, using endpoint: ", ce.Endpoint()) + go func() { + for range time.NewTicker(1 * time.Minute).C { + ce.RefreshToken() + } + }() + composedHandler = queue.ConcurrencyStateHandler(logger, composedHandler, ce.Pause, ce.Resume) + // start paused + ce.Pause(logger) + } + + metricsSupported := supportsMetrics(ctx, logger, env) + if metricsSupported { + composedHandler = requestAppMetricsHandler(logger, composedHandler, breaker, env) + } + composedHandler = queue.ProxyHandler(breaker, stats, tracingEnabled, composedHandler) + composedHandler = queue.ForwardedShimHandler(composedHandler) + composedHandler = handler.NewTimeoutHandler(composedHandler, "request timeout", func(r *http.Request) (time.Duration, time.Duration, time.Duration) { + return timeout, responseStartTimeout, idleTimeout + }) + + if metricsSupported { + composedHandler = requestMetricsHandler(logger, composedHandler, env) + } + if tracingEnabled { + composedHandler = tracing.HTTPSpanMiddleware(composedHandler) + } + + drainer := &pkghandler.Drainer{ + QuietPeriod: drainSleepDuration, + // Add Activator probe header to the drainer so it can handle probes directly from activator + HealthCheckUAPrefixes: []string{netheader.ActivatorUserAgent, netheader.AutoscalingUserAgent}, + Inner: composedHandler, + HealthCheck: health.ProbeHandler(prober, tracingEnabled), + } + composedHandler = drainer + + if env.ServingEnableRequestLog { + // We want to capture the probes/healthchecks in the request logs. + // Hence we need to have RequestLogHandler be the first one. + composedHandler = requestLogHandler(logger, composedHandler, env) + } + return composedHandler, drainer +} + +func adminHandler(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer) http.Handler { + mux := http.NewServeMux() + mux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { + logger.Info("Attached drain handler from user-container", r) + + go func() { + select { + case <-ctx.Done(): + case <-time.After(time.Second): + // If the context isn't done then the queue proxy didn't + // receive a TERM signal. Thus the user-container's + // liveness probes are triggering the container to restart + // and we shouldn't block that + drainer.Reset() + } + }() + + drainer.Drain() + w.WriteHeader(http.StatusOK) + }) + + return mux +} diff --git a/pkg/queue/sharedmain/main.go b/pkg/queue/sharedmain/main.go index 9b3c8e03005c..a98e44ca6a8a 100644 --- a/pkg/queue/sharedmain/main.go +++ b/pkg/queue/sharedmain/main.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "net" "net/http" "os" "strconv" @@ -34,26 +33,20 @@ import ( "k8s.io/apimachinery/pkg/types" "knative.dev/control-protocol/pkg/certificates" - netheader "knative.dev/networking/pkg/http/header" - netproxy "knative.dev/networking/pkg/http/proxy" netstats "knative.dev/networking/pkg/http/stats" pkglogging "knative.dev/pkg/logging" "knative.dev/pkg/logging/logkey" "knative.dev/pkg/metrics" pkgnet "knative.dev/pkg/network" - pkghandler "knative.dev/pkg/network/handlers" "knative.dev/pkg/profiling" "knative.dev/pkg/signals" "knative.dev/pkg/tracing" tracingconfig "knative.dev/pkg/tracing/config" "knative.dev/pkg/tracing/propagation/tracecontextb3" - "knative.dev/serving/pkg/activator" pkghttp "knative.dev/serving/pkg/http" - "knative.dev/serving/pkg/http/handler" "knative.dev/serving/pkg/logging" "knative.dev/serving/pkg/networking" "knative.dev/serving/pkg/queue" - "knative.dev/serving/pkg/queue/health" "knative.dev/serving/pkg/queue/readiness" ) @@ -246,29 +239,32 @@ func Main(opts ...Option) error { // Enable TLS when certificate is mounted. tlsEnabled := exists(logger, certPath) && exists(logger, keyPath) - mainServer, drainer := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, false) + mainHandler, drainer := mainHandler(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint) + adminHandler := adminHandler(d.Ctx, logger, drainer) + + // Enable TLS server when activator server certs are mounted. + // At this moment activator with TLS does not disable HTTP. + // See also https://github.com/knative/serving/issues/12808. httpServers := map[string]*http.Server{ - "main": mainServer, - "metrics": buildMetricsServer(protoStatReporter), - "admin": buildAdminServer(d.Ctx, logger, drainer), + "main": mainServer(":"+env.QueueServingPort, mainHandler), + "admin": adminServer(":"+strconv.Itoa(networking.QueueAdminPort), adminHandler), + "metrics": metricsServer(protoStatReporter), } + if env.EnableProfiling { httpServers["profile"] = profiling.NewServer(profiling.NewHandler(logger, true)) } - // Enable TLS server when activator server certs are mounted. - // At this moment activator with TLS does not disable HTTP. - // See also https://github.com/knative/serving/issues/12808. - var tlsServers map[string]*http.Server + tlsServers := map[string]*http.Server{ + "main": mainServer(":"+env.QueueServingTLSPort, mainHandler), + "admin": adminServer(":"+strconv.Itoa(networking.QueueAdminPort), adminHandler), + } + if tlsEnabled { - mainTLSServer, drainer := buildServer(d.Ctx, env, d.Transport, probe, stats, logger, concurrencyendpoint, true /* enable TLS */) - tlsServers = map[string]*http.Server{ - "tlsMain": mainTLSServer, - "tlsAdmin": buildAdminServer(d.Ctx, logger, drainer), - } - // Drop admin http server as we Use TLS for the admin server. - // TODO: The drain created with mainServer above is lost. Unify the two drain. + // Drop admin http server since the admin TLS server is listening on the same port delete(httpServers, "admin") + } else { + tlsServers = map[string]*http.Server{} } logger.Info("Starting queue-proxy") @@ -277,6 +273,7 @@ func Main(opts ...Option) error { for name, server := range httpServers { go func(name string, s *http.Server) { // Don't forward ErrServerClosed as that indicates we're already shutting down. + logger.Info("Starting http server ", name, s.Addr) if err := s.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- fmt.Errorf("%s server failed to serve: %w", name, err) } @@ -285,6 +282,7 @@ func Main(opts ...Option) error { for name, server := range tlsServers { go func(name string, s *http.Server) { // Don't forward ErrServerClosed as that indicates we're already shutting down. + logger.Info("Starting tls server ", name, s.Addr) if err := s.ListenAndServeTLS(certPath, keyPath); err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- fmt.Errorf("%s server failed to serve: %w", name, err) } @@ -306,13 +304,16 @@ func Main(opts ...Option) error { logger.Infof("Sleeping %v to allow K8s propagation of non-ready state", drainSleepDuration) drainer.Drain() - // Removing the main server from the shutdown logic as we've already shut it down. - delete(httpServers, "main") - - for serverName, srv := range httpServers { - logger.Info("Shutting down server: ", serverName) + for name, srv := range httpServers { + logger.Info("Shutting down server: ", name) + if err := srv.Shutdown(context.Background()); err != nil { + logger.Errorw("Failed to shutdown server", zap.String("server", name), zap.Error(err)) + } + } + for name, srv := range tlsServers { + logger.Info("Shutting down server: ", name) if err := srv.Shutdown(context.Background()); err != nil { - logger.Errorw("Failed to shutdown server", zap.String("server", serverName), zap.Error(err)) + logger.Errorw("Failed to shutdown server", zap.String("server", name), zap.Error(err)) } } logger.Info("Shutdown complete, exiting...") @@ -339,84 +340,6 @@ func buildProbe(logger *zap.SugaredLogger, encodedProbe string, autodetectHTTP2 return readiness.NewProbe(coreProbe) } -func buildServer(ctx context.Context, env config, transport http.RoundTripper, probeContainer func() bool, stats *netstats.RequestStats, logger *zap.SugaredLogger, - ce *queue.ConcurrencyEndpoint, enableTLS bool) (*http.Server, *pkghandler.Drainer) { - // TODO: If TLS is enabled, execute probes twice and tracking two different sets of container health. - - target := net.JoinHostPort("127.0.0.1", env.UserPort) - - httpProxy := pkghttp.NewHeaderPruningReverseProxy(target, pkghttp.NoHostOverride, activator.RevisionHeaders, false /* use HTTP */) - httpProxy.Transport = transport - httpProxy.ErrorHandler = pkghandler.Error(logger) - httpProxy.BufferPool = netproxy.NewBufferPool() - httpProxy.FlushInterval = netproxy.FlushInterval - - // TODO: During HTTP and HTTPS transition, counting concurrency could not be accurate. Count accurately. - breaker := buildBreaker(logger, env) - metricsSupported := supportsMetrics(ctx, logger, env, enableTLS) - tracingEnabled := env.TracingConfigBackend != tracingconfig.None - concurrencyStateEnabled := env.ConcurrencyStateEndpoint != "" - timeout := time.Duration(env.RevisionTimeoutSeconds) * time.Second - var responseStartTimeout = 0 * time.Second - if env.RevisionResponseStartTimeoutSeconds != 0 { - responseStartTimeout = time.Duration(env.RevisionResponseStartTimeoutSeconds) * time.Second - } - var idleTimeout = 0 * time.Second - if env.RevisionIdleTimeoutSeconds != 0 { - idleTimeout = time.Duration(env.RevisionIdleTimeoutSeconds) * time.Second - } - // Create queue handler chain. - // Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first. - var composedHandler http.Handler = httpProxy - if concurrencyStateEnabled { - logger.Info("Concurrency state endpoint set, tracking request counts, using endpoint: ", ce.Endpoint()) - go func() { - for range time.NewTicker(1 * time.Minute).C { - ce.RefreshToken() - } - }() - composedHandler = queue.ConcurrencyStateHandler(logger, composedHandler, ce.Pause, ce.Resume) - // start paused - ce.Pause(logger) - } - if metricsSupported { - composedHandler = requestAppMetricsHandler(logger, composedHandler, breaker, env) - } - composedHandler = queue.ProxyHandler(breaker, stats, tracingEnabled, composedHandler) - composedHandler = queue.ForwardedShimHandler(composedHandler) - composedHandler = handler.NewTimeoutHandler(composedHandler, "request timeout", func(r *http.Request) (time.Duration, time.Duration, time.Duration) { - return timeout, responseStartTimeout, idleTimeout - }) - - if metricsSupported { - composedHandler = requestMetricsHandler(logger, composedHandler, env) - } - if tracingEnabled { - composedHandler = tracing.HTTPSpanMiddleware(composedHandler) - } - - drainer := &pkghandler.Drainer{ - QuietPeriod: drainSleepDuration, - // Add Activator probe header to the drainer so it can handle probes directly from activator - HealthCheckUAPrefixes: []string{netheader.ActivatorUserAgent, netheader.AutoscalingUserAgent}, - Inner: composedHandler, - HealthCheck: health.ProbeHandler(probeContainer, tracingEnabled), - } - composedHandler = drainer - - if env.ServingEnableRequestLog { - // We want to capture the probes/healthchecks in the request logs. - // Hence we need to have RequestLogHandler be the first one. - composedHandler = requestLogHandler(logger, composedHandler, env) - } - - if enableTLS { - return pkgnet.NewServer(":"+env.QueueServingTLSPort, composedHandler), drainer - } - - return pkgnet.NewServer(":"+env.QueueServingPort, composedHandler), drainer -} - func buildTransport(env config) http.RoundTripper { maxIdleConns := 1000 // TODO: somewhat arbitrary value for CC=0, needs experimental validation. if env.ContainerConcurrency > 0 { @@ -452,11 +375,7 @@ func buildBreaker(logger *zap.SugaredLogger, env config) *queue.Breaker { return queue.NewBreaker(params) } -func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config, enableTLS bool) bool { - // Keep it on HTTP because Metrics needs to be registered on either TLS server or non-TLS server. - if enableTLS { - return false - } +func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config) bool { // Setup request metrics reporting for end-user metrics. if env.ServingRequestMetricsBackend == "" { return false @@ -469,45 +388,6 @@ func supportsMetrics(ctx context.Context, logger *zap.SugaredLogger, env config, return true } -func buildAdminServer(ctx context.Context, logger *zap.SugaredLogger, drainer *pkghandler.Drainer) *http.Server { - adminMux := http.NewServeMux() - adminMux.HandleFunc(queue.RequestQueueDrainPath, func(w http.ResponseWriter, r *http.Request) { - logger.Info("Attached drain handler from user-container", r) - - go func() { - select { - case <-ctx.Done(): - case <-time.After(time.Second): - // If the context isn't done then the queue proxy didn't - // receive a TERM signal. Thus the user-container's - // liveness probes are triggering the container to restart - // and we shouldn't block that - drainer.Reset() - } - }() - - drainer.Drain() - w.WriteHeader(http.StatusOK) - }) - - //nolint:gosec // https://github.com/knative/serving/issues/13439 - return &http.Server{ - Addr: ":" + strconv.Itoa(networking.QueueAdminPort), - Handler: adminMux, - } -} - -func buildMetricsServer(protobufStatReporter *queue.ProtobufStatsReporter) *http.Server { - metricsMux := http.NewServeMux() - metricsMux.Handle("/metrics", queue.NewStatsHandler(protobufStatReporter)) - - //nolint:gosec // https://github.com/knative/serving/issues/13439 - return &http.Server{ - Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort), - Handler: metricsMux, - } -} - func requestLogHandler(logger *zap.SugaredLogger, currentHandler http.Handler, env config) http.Handler { revInfo := &pkghttp.RequestLogRevision{ Name: env.ServingRevision, diff --git a/pkg/queue/sharedmain/servers.go b/pkg/queue/sharedmain/servers.go new file mode 100644 index 000000000000..d8e1fe9a3486 --- /dev/null +++ b/pkg/queue/sharedmain/servers.go @@ -0,0 +1,51 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sharedmain + +import ( + "net/http" + "strconv" + "time" + + pkgnet "knative.dev/pkg/network" + "knative.dev/serving/pkg/networking" + "knative.dev/serving/pkg/queue" +) + +func mainServer(addr string, handler http.Handler) *http.Server { + return pkgnet.NewServer(addr, handler) +} + +func adminServer(addr string, handler http.Handler) *http.Server { + return &http.Server{ + Addr: addr, + Handler: handler, + // https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 + ReadHeaderTimeout: time.Minute, + } +} + +func metricsServer(reporter *queue.ProtobufStatsReporter) *http.Server { + metricsMux := http.NewServeMux() + metricsMux.Handle("/metrics", queue.NewStatsHandler(reporter)) + + return &http.Server{ + Addr: ":" + strconv.Itoa(networking.AutoscalingQueueMetricsPort), + Handler: metricsMux, + ReadHeaderTimeout: time.Minute, //https://medium.com/a-journey-with-go/go-understand-and-mitigate-slowloris-attack-711c1b1403f6 + } +}