diff --git a/cmd/pgmetrics/report.go b/cmd/pgmetrics/report.go index 43713b5..16d6ee5 100644 --- a/cmd/pgmetrics/report.go +++ b/cmd/pgmetrics/report.go @@ -144,6 +144,7 @@ PostgreSQL Cluster: if version >= pgv96 { reportVacuumProgress(fd, result) } + reportProgress(fd, result) reportRoles(fd, result) reportTablespaces(fd, result) reportDatabases(fd, result) @@ -624,6 +625,75 @@ Vacuum Progress:`) tw.write(fd, " ") } +func reportProgress(fd io.Writer, result *pgmetrics.Model) { + if len(result.VacuumProgress)+len(result.AnalyzeProgress)+ + len(result.BasebackupProgress)+len(result.ClusterProgress)+ + len(result.CopyProgress)+len(result.CreateIndexProgress) == 0 { + return // no jobs in progress + } + + var tw tableWriter + tw.add("Job", "Backend", "Working On", "Status") + + // analyze + for _, a := range result.AnalyzeProgress { + object := "?" + if t := result.TableByOID(a.TableOID); t != nil { + object = a.DBName + "." + t.Name + } + tw.add("ANALYZE", a.PID, object, a.Phase) + } + + // basebackup + for _, b := range result.BasebackupProgress { + tw.add("BASEBACKUP", b.PID, "", b.Phase) + } + + // cluster / vacuum full + for _, c := range result.ClusterProgress { + object := "?" + if t := result.TableByOID(c.TableOID); t != nil { + object = c.DBName + "." + t.Name + } + tw.add(c.Command, c.PID, object, c.Phase) + } + + // copy from / copy to + for _, c := range result.CopyProgress { + object := "(query)" + if t := result.TableByOID(c.TableOID); t != nil { + object = c.DBName + "." + t.Name + } + tw.add(c.Command, c.PID, object, "") + } + + // create index (concurrently) / reindex (concurrently) + for _, c := range result.CreateIndexProgress { + object := "?" + if t := result.TableByOID(c.TableOID); t != nil { + object = c.DBName + "." + t.Name + if idx := result.IndexByOID(c.IndexOID); idx != nil { + object += "." + idx.Name + } + } + tw.add(c.Command, c.PID, object, c.Phase) + } + + // vacuum + for _, v := range result.VacuumProgress { + object := "?" + if t := result.TableByOID(v.TableOID); t != nil { + object = v.DBName + "." + t.Name + } + tw.add("VACUUM", v.PID, object, v.Phase) + } + + fmt.Fprint(fd, ` +Jobs In Progress: +`) + tw.write(fd, " ") +} + func reportRoles(fd io.Writer, result *pgmetrics.Model) { fmt.Fprint(fd, ` Roles: diff --git a/collector/collect.go b/collector/collect.go index ca5e3c4..7c14159 100644 --- a/collector/collect.go +++ b/collector/collect.go @@ -1669,7 +1669,7 @@ func (c *collector) getVacuumProgress() { ctx, cancel := context.WithTimeout(context.Background(), c.timeout) defer cancel() - q := `SELECT datname, COALESCE(relid, 0), COALESCE(phase, ''), + q := `SELECT pid, datname, COALESCE(relid, 0), COALESCE(phase, ''), COALESCE(heap_blks_total, 0), COALESCE(heap_blks_scanned, 0), COALESCE(heap_blks_vacuumed, 0), COALESCE(index_vacuum_count, 0), COALESCE(max_dead_tuples, 0), COALESCE(num_dead_tuples, 0) @@ -1683,7 +1683,7 @@ func (c *collector) getVacuumProgress() { for rows.Next() { var p pgmetrics.VacuumProgressBackend - if err := rows.Scan(&p.DBName, &p.TableOID, &p.Phase, &p.HeapBlksTotal, + if err := rows.Scan(&p.PID, &p.DBName, &p.TableOID, &p.Phase, &p.HeapBlksTotal, &p.HeapBlksScanned, &p.HeapBlksVacuumed, &p.IndexVacuumCount, &p.MaxDeadTuples, &p.NumDeadTuples); err != nil { log.Fatalf("pg_stat_progress_vacuum query failed: %v", err) @@ -2460,7 +2460,7 @@ func (c *collector) getProgressAnalyze() { var out []pgmetrics.AnalyzeProgressBackend for rows.Next() { var r pgmetrics.AnalyzeProgressBackend - if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Phase, + if err := rows.Scan(&r.PID, &r.DBName, &r.TableOID, &r.Phase, &r.SampleBlocksTotal, &r.SampleBlocksScanned, &r.ExtStatsTotal, &r.ExtStatsComputed, &r.ChildTablesTotal, &r.ChildTablesDone, &r.CurrentChildTableRelOID); err != nil { @@ -2535,7 +2535,7 @@ func (c *collector) getProgressCluster() { var out []pgmetrics.ClusterProgressBackend for rows.Next() { var r pgmetrics.ClusterProgressBackend - if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Command, &r.Phase, + if err := rows.Scan(&r.PID, &r.DBName, &r.TableOID, &r.Command, &r.Phase, &r.ClusterIndexOID, &r.HeapTuplesScanned, &r.HeapTuplesWritten, &r.HeapBlksTotal, &r.HeapBlksScanned, &r.IndexRebuildCount); err != nil { log.Fatalf("pg_stat_progress_cluster query scan failed: %v", err) @@ -2572,7 +2572,7 @@ func (c *collector) getProgressCopy() { var out []pgmetrics.CopyProgressBackend for rows.Next() { var r pgmetrics.CopyProgressBackend - if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Command, &r.Type, + if err := rows.Scan(&r.PID, &r.DBName, &r.TableOID, &r.Command, &r.Type, &r.BytesProcessed, &r.BytesTotal, &r.TuplesProcessed, &r.TuplesExcluded); err != nil { log.Fatalf("pg_stat_progress_copy query scan failed: %v", err) diff --git a/model.go b/model.go index 36d7cf9..c09034e 100644 --- a/model.go +++ b/model.go @@ -229,6 +229,17 @@ func (m *Model) IndexByName(db, schema, index string) *Index { return nil } +// IndexByOID iterates over the indexes in the model and returns the reference +// to an Index that has the given oid. If there is no such index, it returns nil. +func (m *Model) IndexByOID(oid int) *Index { + for i, idx := range m.Indexes { + if idx.OID == oid { + return &m.Indexes[i] + } + } + return nil +} + // Metadata contains information about how to interpret the other fields in // "Model" data structure. type Metadata struct { @@ -457,6 +468,8 @@ type VacuumProgressBackend struct { IndexVacuumCount int64 `json:"index_vacuum_count"` MaxDeadTuples int64 `json:"max_dead_tuples"` NumDeadTuples int64 `json:"num_dead_tuples"` + // following fields present only in schema 1.12 and later + PID int `json:"pid,omitempty"` } type Extension struct { @@ -828,7 +841,7 @@ type Azure struct { type AnalyzeProgressBackend struct { PID int `json:"pid"` DBName string `json:"db_name"` - RelOID int `json:"rel_oid"` + TableOID int `json:"table_oid"` Phase string `json:"phase"` SampleBlocksTotal int64 `json:"sample_blks_total"` SampleBlocksScanned int64 `json:"sample_blks_scanned"` @@ -859,7 +872,7 @@ type BasebackupProgressBackend struct { type ClusterProgressBackend struct { PID int `json:"pid"` DBName string `json:"db_name"` - RelOID int `json:"rel_oid"` + TableOID int `json:"table_oid"` Command string `json:"command"` Phase string `json:"phase"` ClusterIndexOID int `json:"cluser_index_oid"` @@ -877,7 +890,7 @@ type ClusterProgressBackend struct { type CopyProgressBackend struct { PID int `json:"pid"` DBName string `json:"db_name"` - RelOID int `json:"rel_oid"` + TableOID int `json:"table_oid"` Command string `json:"command"` Type string `json:"type"` BytesProcessed int64 `json:"bytes_processed"`