Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only allocate named logger if necessary in activator #11008

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func main() {

// Create activation handler chain
// Note: innermost handlers are specified first, ie. the last handler in the chain will be executed first
var ah http.Handler = activatorhandler.New(ctx, throttler, transport)
var ah http.Handler = activatorhandler.New(ctx, throttler, transport, logger)
ah = concurrencyReporter.Handler(ah)
ah = tracing.HTTPSpanMiddleware(ah)
ah = configStore.HTTPMiddleware(ah)
Expand Down
4 changes: 1 addition & 3 deletions pkg/activator/handler/context_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,15 @@ func (h *contextHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

revID := types.NamespacedName{Namespace: namespace, Name: name}
logger := h.logger.With(zap.String(logkey.Key, revID.String()))

revision, err := h.revisionLister.Revisions(namespace).Get(name)
if err != nil {
logger.Errorw("Error while getting revision", zap.Error(err))
h.logger.Errorw("Error while getting revision", zap.String(logkey.Key, revID.String()), zap.Error(err))
sendError(err, w)
return
}

ctx := r.Context()
ctx = logging.WithLogger(ctx, logger)
ctx = WithRevision(ctx, revision)
ctx = WithRevID(ctx, revID)

Expand Down
20 changes: 12 additions & 8 deletions pkg/activator/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"k8s.io/apimachinery/pkg/types"

network "knative.dev/networking/pkg"
"knative.dev/pkg/logging"
"knative.dev/pkg/logging/logkey"
pkgnet "knative.dev/pkg/network"
tracingconfig "knative.dev/pkg/tracing/config"
"knative.dev/pkg/tracing/propagation/tracecontextb3"
Expand All @@ -50,10 +50,11 @@ type activationHandler struct {
tracingTransport http.RoundTripper
throttler Throttler
bufferPool httputil.BufferPool
logger *zap.SugaredLogger
}

// New constructs a new http.Handler that deals with revision activation.
func New(_ context.Context, t Throttler, transport http.RoundTripper) http.Handler {
func New(_ context.Context, t Throttler, transport http.RoundTripper, logger *zap.SugaredLogger) http.Handler {
return &activationHandler{
transport: transport,
tracingTransport: &ochttp.Transport{
Expand All @@ -62,26 +63,27 @@ func New(_ context.Context, t Throttler, transport http.RoundTripper) http.Handl
},
throttler: t,
bufferPool: network.NewBufferPool(),
logger: logger,
}
}

func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := logging.FromContext(r.Context())
tracingEnabled := activatorconfig.FromContext(r.Context()).Tracing.Backend != tracingconfig.None

tryContext, trySpan := r.Context(), (*trace.Span)(nil)
if tracingEnabled {
tryContext, trySpan = trace.StartSpan(r.Context(), "throttler_try")
}

if err := a.throttler.Try(tryContext, RevIDFrom(r.Context()), func(dest string) error {
revID := RevIDFrom(r.Context())
if err := a.throttler.Try(tryContext, revID, func(dest string) error {
trySpan.End()

proxyCtx, proxySpan := r.Context(), (*trace.Span)(nil)
if tracingEnabled {
proxyCtx, proxySpan = trace.StartSpan(r.Context(), "activator_proxy")
}
a.proxyRequest(logger, w, r.WithContext(proxyCtx), dest, tracingEnabled)
a.proxyRequest(revID, w, r.WithContext(proxyCtx), dest, tracingEnabled)
proxySpan.End()

return nil
Expand All @@ -90,7 +92,7 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
trySpan.Annotate([]trace.Attribute{trace.StringAttribute("activator.throttler.error", err.Error())}, "ThrottlerTry")
trySpan.End()

logger.Errorw("Throttler try error", zap.Error(err))
a.logger.Errorw("Throttler try error", zap.String(logkey.Key, revID.String()), zap.Error(err))

if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, queue.ErrRequestQueueFull) {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
Expand All @@ -100,7 +102,7 @@ func (a *activationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (a *activationHandler) proxyRequest(logger *zap.SugaredLogger, w http.ResponseWriter, r *http.Request, target string, tracingEnabled bool) {
func (a *activationHandler) proxyRequest(revID types.NamespacedName, w http.ResponseWriter, r *http.Request, target string, tracingEnabled bool) {
network.RewriteHostIn(r)
r.Header.Set(network.ProxyHeaderName, activator.Name)

Expand All @@ -112,7 +114,9 @@ func (a *activationHandler) proxyRequest(logger *zap.SugaredLogger, w http.Respo
proxy.Transport = a.tracingTransport
}
proxy.FlushInterval = network.FlushInterval
proxy.ErrorHandler = pkgnet.ErrorHandler(logger)
proxy.ErrorHandler = func(w http.ResponseWriter, req *http.Request, err error) {
pkgnet.ErrorHandler(a.logger.With(zap.String(logkey.Key, revID.String())))(w, req, err)
}

proxy.ServeHTTP(w, r)
}
8 changes: 4 additions & 4 deletions pkg/activator/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestActivationHandler(t *testing.T) {

ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()
handler := New(ctx, test.throttler, rt)
handler := New(ctx, test.throttler, rt, logging.FromContext(ctx))

resp := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestActivationHandlerProxyHeader(t *testing.T) {
ctx, cancel, _ := rtesting.SetupFakeContextWithCancel(t)
defer cancel()

handler := New(ctx, fakeThrottler{}, rt)
handler := New(ctx, fakeThrottler{}, rt, logging.FromContext(ctx))

writer := httptest.NewRecorder()
req := httptest.NewRequest(http.MethodPost, "http://example.com", nil)
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestActivationHandlerTraceSpans(t *testing.T) {
oct.Finish()
}()

handler := New(ctx, fakeThrottler{}, rt)
handler := New(ctx, fakeThrottler{}, rt, logging.FromContext(ctx))

// Set up config store to populate context.
configStore := setupConfigStore(t, logging.FromContext(ctx))
Expand Down Expand Up @@ -311,7 +311,7 @@ func BenchmarkHandler(b *testing.B) {
}, nil
})

handler := New(ctx, fakeThrottler{}, rt)
handler := New(ctx, fakeThrottler{}, rt, logging.FromContext(ctx))

request := func() *http.Request {
req := httptest.NewRequest(http.MethodGet, "http://example.com", nil)
Expand Down