From df601fe449072963fe202be93c21a664c510f633 Mon Sep 17 00:00:00 2001 From: Jonathan Amsterdam Date: Thu, 29 Feb 2024 15:34:20 -0500 Subject: [PATCH] internal/middleware,etc.: store requests This CL provides several improvements to the worker home page: - Display all requests, not just fetches. - Link to the logs for each request. - Link that will cancel a request. At the heart of these changes is a new piece of middleware that tracks all active requests, along with their trace ID and a function that can be used to cancel them. This change also affects logging, because the logger doesn't need to maintain its own trace ID. Change-Id: Id022170073d2d7ca4e45aaa1d78b216d8a512f35 Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/568236 LUCI-TryBot-Result: Go LUCI Reviewed-by: Michael Matloob kokoro-CI: kokoro --- cmd/frontend/main.go | 1 + cmd/worker/main.go | 1 + go.mod | 1 + go.sum | 2 + internal/log/stackdriverlogger/log.go | 19 ++------ internal/middleware/requestlog.go | 18 +++---- internal/middleware/requests.go | 68 +++++++++++++++++++++++++++ internal/request_info.go | 51 ++++++++++++++++++++ internal/trace/trace.go | 2 +- internal/worker/fetch.go | 9 ++-- internal/worker/fetchinfo.go | 17 ++++--- internal/worker/pages.go | 25 +++++----- internal/worker/server.go | 51 +++++++++++++++++++- static/worker/index.tmpl | 31 +++++++++++- 14 files changed, 244 insertions(+), 52 deletions(-) create mode 100644 internal/middleware/requests.go create mode 100644 internal/request_info.go diff --git a/cmd/frontend/main.go b/cmd/frontend/main.go index f1f841b94..ad946ec53 100644 --- a/cmd/frontend/main.go +++ b/cmd/frontend/main.go @@ -218,6 +218,7 @@ func main() { ermw = middleware.ErrorReporting(reporter) } mw := middleware.Chain( + middleware.RequestInfo(), middleware.RequestLog(cmdconfig.Logger(ctx, cfg, "frontend-log")), middleware.AcceptRequests(http.MethodGet, http.MethodPost, http.MethodHead), // accept only GETs, POSTs and HEADs middleware.BetaPkgGoDevRedirect(), diff --git a/cmd/worker/main.go b/cmd/worker/main.go index 27b37cc4c..c93a145b5 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -143,6 +143,7 @@ func main() { } mw := middleware.Chain( + middleware.RequestInfo(), // must be first middleware.RequestLog(cmdconfig.Logger(ctx, cfg, "worker-log")), mtimeout.Timeout(time.Duration(timeout)*time.Minute), iap, diff --git a/go.mod b/go.mod index 913472eba..5dd64ad0a 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( github.com/lib/pq v1.10.9 github.com/russross/blackfriday/v2 v2.1.0 go.opencensus.io v0.24.0 + golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 golang.org/x/mod v0.15.0 golang.org/x/net v0.21.0 golang.org/x/sync v0.6.0 diff --git a/go.sum b/go.sum index 2df091e22..55757c302 100644 --- a/go.sum +++ b/go.sum @@ -1116,6 +1116,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225 h1:LfspQV/FYTatPTr/3HzIcmiUFH7PGP+OQ6mgDYo3yuQ= +golang.org/x/exp v0.0.0-20240222234643-814bf88cf225/go.mod h1:CxmFvTBINI24O/j8iY7H1xHzx2i4OsyguNBmN/uPtqc= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= diff --git a/internal/log/stackdriverlogger/log.go b/internal/log/stackdriverlogger/log.go index 184e858a6..0238070f1 100644 --- a/internal/log/stackdriverlogger/log.go +++ b/internal/log/stackdriverlogger/log.go @@ -16,6 +16,7 @@ import ( "sync" "cloud.google.com/go/logging" + "golang.org/x/pkgsite/internal" "golang.org/x/pkgsite/internal/derrors" "golang.org/x/pkgsite/internal/experiment" "golang.org/x/pkgsite/internal/log" @@ -31,18 +32,8 @@ func init() { } } -type ( - // traceIDKey is the type of the context key for trace IDs. - traceIDKey struct{} - - // labelsKey is the type of the context key for labels. - labelsKey struct{} -) - -// NewContextWithTraceID creates a new context from ctx that adds the trace ID. -func NewContextWithTraceID(ctx context.Context, traceID string) context.Context { - return context.WithValue(ctx, traceIDKey{}, traceID) -} +// labelsKey is the type of the context key for labels. +type labelsKey struct{} // NewContextWithLabel creates a new context from ctx that adds a label that will // appear in the log entry. @@ -90,7 +81,7 @@ func (l *logger) Log(ctx context.Context, s log.Severity, payload any) { if err, ok := payload.(error); ok { payload = err.Error() } - traceID, _ := ctx.Value(traceIDKey{}).(string) // if not present, traceID is "", which is fine + labels, _ := ctx.Value(labelsKey{}).(map[string]string) es := experimentString(ctx) if len(es) > 0 { @@ -105,7 +96,7 @@ func (l *logger) Log(ctx context.Context, s log.Severity, payload any) { Severity: stackdriverSeverity(s), Labels: labels, Payload: payload, - Trace: traceID, + Trace: internal.RequestInfoFromContext(ctx).TraceID, }) } diff --git a/internal/middleware/requestlog.go b/internal/middleware/requestlog.go index 93b42673a..9272e725d 100644 --- a/internal/middleware/requestlog.go +++ b/internal/middleware/requestlog.go @@ -13,8 +13,8 @@ import ( "time" "cloud.google.com/go/logging" + "golang.org/x/pkgsite/internal" "golang.org/x/pkgsite/internal/log" - "golang.org/x/pkgsite/internal/log/stackdriverlogger" ) // Logger is the interface used to write request logs to GCP. @@ -40,12 +40,12 @@ func (l LocalLogger) Log(entry logging.Entry) { } // RequestLog returns a middleware that logs each incoming requests using the -// given logger. This logger replaces the built-in appengine request logger, -// which logged PII when behind IAP, in such a way that was impossible to turn -// off. +// given logger. // // Logs may be viewed in Pantheon by selecting the log source corresponding to -// the AppEngine service name (e.g. 'dev-worker'). +// the service name (e.g. 'dev-worker'). +// +// Install this middleware after RequestInfo to ensure that trace IDs appear in the log. func RequestLog(lg Logger) Middleware { return func(h http.Handler) http.Handler { return &handler{delegate: h, logger: lg} @@ -59,21 +59,21 @@ type handler struct { func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() - traceID := r.Header.Get("X-Cloud-Trace-Context") severity := logging.Info if r.Method == http.MethodGet && r.URL.Path == "/healthz" { severity = logging.Debug } + requestInfo := internal.RequestInfoFromContext(r.Context()) h.logger.Log(logging.Entry{ HTTPRequest: &logging.HTTPRequest{Request: r}, Payload: map[string]string{ "requestType": "request start", }, Severity: severity, - Trace: traceID, + Trace: requestInfo.TraceID, }) w2 := &responseWriter{ResponseWriter: w} - h.delegate.ServeHTTP(w2, r.WithContext(stackdriverlogger.NewContextWithTraceID(r.Context(), traceID))) + h.delegate.ServeHTTP(w2, r) s := severity if w2.status == http.StatusServiceUnavailable { // load shedding is a warning, not an error @@ -92,7 +92,7 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { "isRobot": isRobot(r.Header.Get("User-Agent")), }, Severity: s, - Trace: traceID, + Trace: requestInfo.TraceID, }) } diff --git a/internal/middleware/requests.go b/internal/middleware/requests.go new file mode 100644 index 000000000..cc3f37aae --- /dev/null +++ b/internal/middleware/requests.go @@ -0,0 +1,68 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package middleware + +import ( + "context" + "net/http" + "sync" + + "golang.org/x/pkgsite/internal" +) + +var ( + requestMapMu sync.Mutex + requestMap = map[string]*internal.RequestInfo{} +) + +// RequestInfo adds information about the request to a context. +// It also stores it while the request is active. +// [ActiveRequests] retrieves all stored requests. +func RequestInfo() Middleware { + return func(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + ri := internal.NewRequestInfo(r) + ctx, cancel := context.WithCancelCause(r.Context()) + ri.Cancel = cancel + + // If the request has a trace ID, store it in the requestMap while + // it is active. + if ri.TraceID != "" { + requestMapMu.Lock() + requestMap[ri.TraceID] = ri + requestMapMu.Unlock() + + defer func() { + requestMapMu.Lock() + delete(requestMap, ri.TraceID) + requestMapMu.Unlock() + }() + } + + ctx = internal.NewContextWithRequestInfo(ctx, ri) + h.ServeHTTP(w, r.WithContext(ctx)) + }) + } +} + +// ActiveRequests returns all requests that are currently being handled by the server, +// in no particular order. +func ActiveRequests() []*internal.RequestInfo { + requestMapMu.Lock() + defer requestMapMu.Unlock() + var ris []*internal.RequestInfo + for _, ri := range requestMap { + ris = append(ris, ri) + } + return ris +} + +// RequestForTraceID returns the active request with the given trace ID, +// or nil if there is no such request. +func RequestForTraceID(id string) *internal.RequestInfo { + requestMapMu.Lock() + defer requestMapMu.Unlock() + return requestMap[id] +} diff --git a/internal/request_info.go b/internal/request_info.go new file mode 100644 index 000000000..62cbecf85 --- /dev/null +++ b/internal/request_info.go @@ -0,0 +1,51 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package internal + +import ( + "context" + "net/http" + "time" +) + +// RequestInfo is information about an HTTP request. +type RequestInfo struct { + Request *http.Request + TrimmedURL string // URL with the scheme and host removed + TraceID string // extracted from request header + Start time.Time // when the request began + Cancel func(error) // function that cancels the request's context +} + +func NewRequestInfo(r *http.Request) *RequestInfo { + turl := *r.URL + turl.Scheme = "" + turl.Host = "" + turl.User = nil + return &RequestInfo{ + Request: r, + TrimmedURL: turl.String(), + TraceID: r.Header.Get("X-Cloud-Trace-Context"), + Start: time.Now(), + } +} + +// requestInfoKey is the type of the context key for RequestInfos. +type requestInfoKey struct{} + +// NewContextWithRequestInfo creates a new context from ctx that adds the trace ID. +func NewContextWithRequestInfo(ctx context.Context, ri *RequestInfo) context.Context { + return context.WithValue(ctx, requestInfoKey{}, ri) +} + +// RequestInfoFromContext retrieves the trace ID from the context, or a zero one +// if it isn't there. +func RequestInfoFromContext(ctx context.Context) *RequestInfo { + ri, _ := ctx.Value(requestInfoKey{}).(*RequestInfo) + if ri == nil { + return &RequestInfo{} + } + return ri +} diff --git a/internal/trace/trace.go b/internal/trace/trace.go index 8517aabd6..c686ce801 100644 --- a/internal/trace/trace.go +++ b/internal/trace/trace.go @@ -3,7 +3,7 @@ // license that can be found in the LICENSE file. // package trace provides a wrapper around third party tracing -// libraries +// libraries. package trace import "context" diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go index 55dcfb5f9..d665f85fb 100644 --- a/internal/worker/fetch.go +++ b/internal/worker/fetch.go @@ -157,10 +157,11 @@ func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requested } fi := &FetchInfo{ - ModulePath: modulePath, - Version: requestedVersion, - ZipSize: uint64(zipSize), - Start: time.Now(), + RequestInfo: internal.RequestInfoFromContext(ctx), + ModulePath: modulePath, + Version: requestedVersion, + ZipSize: uint64(zipSize), + Start: time.Now(), } startFetchInfo(fi) defer func() { finishFetchInfo(fi, status, err) }() diff --git a/internal/worker/fetchinfo.go b/internal/worker/fetchinfo.go index 2052e422c..8b9d4d62e 100644 --- a/internal/worker/fetchinfo.go +++ b/internal/worker/fetchinfo.go @@ -8,18 +8,21 @@ import ( "sort" "sync" "time" + + "golang.org/x/pkgsite/internal" ) // FetchInfo describes a fetch in progress, or completed. // It is used to display information on the worker home page. type FetchInfo struct { - ModulePath string - Version string - ZipSize uint64 - Start time.Time - Finish time.Time - Status int - Error error + RequestInfo *internal.RequestInfo + ModulePath string + Version string + ZipSize uint64 + Start time.Time + Finish time.Time + Status int + Error error } var ( diff --git a/internal/worker/pages.go b/internal/worker/pages.go index dd9c13c0a..bd606f8db 100644 --- a/internal/worker/pages.go +++ b/internal/worker/pages.go @@ -19,10 +19,10 @@ import ( "github.com/google/safehtml/template" "golang.org/x/pkgsite/internal" "golang.org/x/pkgsite/internal/config" - "golang.org/x/pkgsite/internal/config/serverconfig" "golang.org/x/pkgsite/internal/derrors" "golang.org/x/pkgsite/internal/log" "golang.org/x/pkgsite/internal/memory" + "golang.org/x/pkgsite/internal/middleware" "golang.org/x/pkgsite/internal/postgres" "golang.org/x/sync/errgroup" ) @@ -60,17 +60,14 @@ func (s *Server) doIndexPage(w http.ResponseWriter, r *http.Request) (err error) if err != nil { log.Warningf(ctx, "could not get cgroup stats: %v", err) } - var logsURL string - if serverconfig.OnGKE() { - env := s.cfg.DeploymentEnvironment() - cluster := env + "-" + "pkgsite" - logsURL = `https://pantheon.corp.google.com/logs/query;query=resource.type%3D%22k8s_container%22%20resource.labels.cluster_name%3D%22` + - cluster + - `%22%20resource.labels.container_name%3D%22worker%22?project=` + - s.cfg.ProjectID - } else { - logsURL = `https://cloud.google.com/console/logs/viewer?resource=gae_app%2Fmodule_id%2F` + s.cfg.ServiceID + `&project=` + - s.cfg.ProjectID + + // Display requests that aren't fetches separately. + // Don't include the request for this page itself. + var nonFetchRequests []*internal.RequestInfo + for _, ri := range middleware.ActiveRequests() { + if ri.TrimmedURL != "/" && !strings.HasPrefix(ri.TrimmedURL, "/fetch/") { + nonFetchRequests = append(nonFetchRequests, ri) + } } page := struct { @@ -88,7 +85,7 @@ func (s *Server) doIndexPage(w http.ResponseWriter, r *http.Request) (err error) SystemStats memory.SystemStats CgroupStats map[string]uint64 Fetches []*FetchInfo - LogsURL string + OtherRequests []*internal.RequestInfo DBInfo *postgres.UserInfo }{ Config: s.cfg, @@ -104,7 +101,7 @@ func (s *Server) doIndexPage(w http.ResponseWriter, r *http.Request) (err error) SystemStats: sms, CgroupStats: cms, Fetches: FetchInfos(), - LogsURL: logsURL, + OtherRequests: nonFetchRequests, DBInfo: s.workerDBInfo(), } return renderPage(ctx, w, page, s.templates[indexTemplate]) diff --git a/internal/worker/server.go b/internal/worker/server.go index e99a732f2..903d0fea9 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -14,6 +14,7 @@ import ( "io" "math" "net/http" + "net/url" "reflect" "strconv" "strings" @@ -86,7 +87,7 @@ func NewServer(cfg *config.Config, scfg ServerConfig) (_ *Server, err error) { defer derrors.Wrap(&err, "NewServer(db, %+v)", scfg) templates := map[string]*template.Template{} for _, templateName := range []string{indexTemplate, versionsTemplate, excludedTemplate} { - t, err := parseTemplate(scfg.StaticPath, templateName) + t, err := parseTemplate(cfg, scfg.StaticPath, templateName) if err != nil { return nil, err } @@ -235,6 +236,9 @@ func (s *Server) Install(handle func(string, http.Handler)) { // manual ("module" query param): clean all versions of a given module. handle("/clean", rmw(s.errorHandler(s.handleClean))) + // manual: cancel an active request + handle("/cancel", rmw(s.errorHandler(s.handleCancel))) + handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(s.staticPath.String())))) // Health check. @@ -741,6 +745,27 @@ func (s *Server) handleClean(w http.ResponseWriter, r *http.Request) (err error) } } +func (s *Server) handleCancel(w http.ResponseWriter, r *http.Request) (err error) { + defer derrors.Wrap(&err, "handleCancel") + traceID := r.FormValue("trace") + if traceID == "" { + return &serverError{ + http.StatusBadRequest, + errors.New("must provide 'traceID' query param"), + } + } + ri := middleware.RequestForTraceID(traceID) + if ri == nil { + return &serverError{http.StatusNotFound, errors.New("no request with that trace ID")} + } + if ri.Cancel == nil { + return errors.New("RequestInfo.Cancel is nil") + } + ri.Cancel(errors.New("/cancel handler")) + fmt.Fprintf(w, "request with trace ID %s canceled\n", traceID) + return nil +} + func (s *Server) handleHealthCheck(w http.ResponseWriter, r *http.Request) { if err := s.db.Underlying().Ping(); err != nil { http.Error(w, fmt.Sprintf("DB ping failed: %v", err), http.StatusInternalServerError) @@ -750,7 +775,7 @@ func (s *Server) handleHealthCheck(w http.ResponseWriter, r *http.Request) { } // Parse the template for the status page. -func parseTemplate(staticPath template.TrustedSource, filename string) (*template.Template, error) { +func parseTemplate(cfg *config.Config, staticPath template.TrustedSource, filename string) (*template.Template, error) { if staticPath.String() == "" { return nil, nil } @@ -759,6 +784,27 @@ func parseTemplate(staticPath template.TrustedSource, filename string) (*templat if err != nil { return nil, err } + + var logURLBase, projectParam string + if serverconfig.OnGKE() { + cluster := cfg.DeploymentEnvironment() + "-" + "pkgsite" + logURLBase = `https://pantheon.corp.google.com/logs/query;query=resource.type%3D%22k8s_container%22%20resource.labels.cluster_name%3D%22` + + cluster + + `%22%20resource.labels.container_name%3D%22worker%22` + projectParam = "?project=" + cfg.ProjectID + } + + logURL := func(traceID string) string { + if logURLBase == "" { + return "" + } + var tracePart string + if traceID != "" { + tracePart = url.PathEscape(fmt.Sprintf(` trace="%s"`, traceID)) + } + return logURLBase + tracePart + projectParam + } + return template.New(filename).Funcs(template.FuncMap{ "truncate": truncate, "timefmt": formatTime, @@ -770,6 +816,7 @@ func parseTemplate(staticPath template.TrustedSource, filename string) (*templat "timeSub": func(t1, t2 time.Time) time.Duration { return t1.Sub(t2).Round(time.Second) }, + "logURL": logURL, }).ParseFilesFromTrustedSources(templatePath) } diff --git a/static/worker/index.tmpl b/static/worker/index.tmpl index 687f44b27..e197f57aa 100644 --- a/static/worker/index.tmpl +++ b/static/worker/index.tmpl @@ -23,7 +23,7 @@ target="_blank" rel="noreferrer"> Scheduler | - Logs @@ -176,6 +176,10 @@ {{.Version}} {{.ZipSize | bytesToMi}} {{timeSince .Start}} + {{with .RequestInfo.TraceID}} + Logs + Cancel + {{end}} {{end}} {{end}} @@ -183,6 +187,31 @@ +
+

Other Requests

+ + + + + + + + + {{range .OtherRequests}} + + + + {{with .TraceID}} + + + {{end}} + + {{end}} + +
URLAge
{{.TrimmedURL}}{{timeSince .Start}}LogsCancel
+
+ +