diff --git a/devtools/cmd/db/main.go b/devtools/cmd/db/main.go index d2e5a03a7..3574df9f7 100644 --- a/devtools/cmd/db/main.go +++ b/devtools/cmd/db/main.go @@ -8,12 +8,15 @@ package main import ( "context" + "database/sql" "flag" "fmt" "os" "strings" + "time" _ "github.com/jackc/pgx/v4/stdlib" // for pgx driver + "github.com/lib/pq" "golang.org/x/pkgsite/internal/config/serverconfig" "golang.org/x/pkgsite/internal/database" "golang.org/x/pkgsite/internal/log" @@ -43,6 +46,7 @@ func main() { if err != nil { log.Fatal(ctx, err) } + log.SetLevel("info") dbName := serverconfig.GetEnv("GO_DISCOVERY_DATABASE_NAME", "discovery-db") if err := run(ctx, flag.Args()[0], dbName, cfg.DBConnInfo()); err != nil { @@ -62,6 +66,8 @@ func run(ctx context.Context, cmd, dbName, connectionInfo string) error { return recreate(ctx, dbName) case "truncate": return truncate(ctx, connectionInfo) + case "waiting": + return waiting(ctx, connectionInfo) default: return fmt.Errorf("unsupported arg: %q", cmd) } @@ -123,3 +129,96 @@ func truncate(ctx context.Context, connectionInfo string) error { defer ddb.Close() return database.ResetDB(ctx, ddb) } + +type ProcessInfo struct { + pid int64 + start time.Time + state string + waitEventType *string + waitEvent *string + blockingPIDs []int64 + query string + pos int +} + +func waiting(ctx context.Context, connectionInfo string) error { + var processInfos []*ProcessInfo + db, err := database.Open("pgx", connectionInfo, "dbadmin") + if err != nil { + return err + } + defer db.Close() + + query := ` + SELECT pid, query_start, state, wait_event_type, wait_event, pg_blocking_pids(pid), query + FROM pg_stat_activity + WHERE usename='worker' + ORDER BY 2 + ` + + err = db.RunQuery(ctx, query, func(rows *sql.Rows) error { + var pi ProcessInfo + if err := rows.Scan(&pi.pid, &pi.start, &pi.state, &pi.waitEventType, &pi.waitEvent, pq.Array(&pi.blockingPIDs), &pi.query); err != nil { + return err + } + processInfos = append(processInfos, &pi) + return nil + }) + if err != nil { + return err + } + + byPid := map[int64]*ProcessInfo{} + for _, pi := range processInfos { + byPid[pi.pid] = pi + } + sorted := topoSort(processInfos, byPid) + for i, p := range sorted { + p.pos = i + 1 + } + for _, pi := range sorted { + var wps []int + for _, w := range pi.blockingPIDs { + wps = append(wps, byPid[w].pos) + } + pi.query = strings.TrimSpace(pi.query) + pi.query = strings.Join(strings.Fields(pi.query), " ") + secs := time.Since(pi.start).Seconds() + mins := int(secs / 60) + secs -= float64(mins) * 60 + fmt.Printf("%3d %d %2d:%2.3fs %v\t %s\n", pi.pos, pi.pid, mins, secs, wps, left(pi.query, 50)) + } + return nil +} + +func left(s string, n int) string { + if len(s) < n { + return s + } + return s[:n] +} + +func topoSort(items []*ProcessInfo, byPid map[int64]*ProcessInfo) []*ProcessInfo { + var res []*ProcessInfo + + visited := map[*ProcessInfo]bool{} + var visit func(*ProcessInfo) + visit = func(pi *ProcessInfo) { + if visited[pi] { + return + } + visited[pi] = true + for _, bpid := range pi.blockingPIDs { + visit(byPid[bpid]) + } + res = append(res, pi) + + } + for _, it := range items { + if !visited[it] { + visit(it) + } + } + + return res +}