Skip to content

Commit

Permalink
[devel] Include all job progress in report; tweak fields in model.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevan committed May 16, 2022
1 parent 070c8cb commit 5fc59d0
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
70 changes: 70 additions & 0 deletions cmd/pgmetrics/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ PostgreSQL Cluster:
if version >= pgv96 {
reportVacuumProgress(fd, result)
}
reportProgress(fd, result)
reportRoles(fd, result)
reportTablespaces(fd, result)
reportDatabases(fd, result)
Expand Down Expand Up @@ -624,6 +625,75 @@ Vacuum Progress:`)
tw.write(fd, " ")
}

func reportProgress(fd io.Writer, result *pgmetrics.Model) {
if len(result.VacuumProgress)+len(result.AnalyzeProgress)+
len(result.BasebackupProgress)+len(result.ClusterProgress)+
len(result.CopyProgress)+len(result.CreateIndexProgress) == 0 {
return // no jobs in progress
}

var tw tableWriter
tw.add("Job", "Backend", "Working On", "Status")

// analyze
for _, a := range result.AnalyzeProgress {
object := "?"
if t := result.TableByOID(a.TableOID); t != nil {
object = a.DBName + "." + t.Name
}
tw.add("ANALYZE", a.PID, object, a.Phase)
}

// basebackup
for _, b := range result.BasebackupProgress {
tw.add("BASEBACKUP", b.PID, "", b.Phase)
}

// cluster / vacuum full
for _, c := range result.ClusterProgress {
object := "?"
if t := result.TableByOID(c.TableOID); t != nil {
object = c.DBName + "." + t.Name
}
tw.add(c.Command, c.PID, object, c.Phase)
}

// copy from / copy to
for _, c := range result.CopyProgress {
object := "(query)"
if t := result.TableByOID(c.TableOID); t != nil {
object = c.DBName + "." + t.Name
}
tw.add(c.Command, c.PID, object, "")
}

// create index (concurrently) / reindex (concurrently)
for _, c := range result.CreateIndexProgress {
object := "?"
if t := result.TableByOID(c.TableOID); t != nil {
object = c.DBName + "." + t.Name
if idx := result.IndexByOID(c.IndexOID); idx != nil {
object += "." + idx.Name
}
}
tw.add(c.Command, c.PID, object, c.Phase)
}

// vacuum
for _, v := range result.VacuumProgress {
object := "?"
if t := result.TableByOID(v.TableOID); t != nil {
object = v.DBName + "." + t.Name
}
tw.add("VACUUM", v.PID, object, v.Phase)
}

fmt.Fprint(fd, `
Jobs In Progress:
`)
tw.write(fd, " ")
}

func reportRoles(fd io.Writer, result *pgmetrics.Model) {
fmt.Fprint(fd, `
Roles:
Expand Down
10 changes: 5 additions & 5 deletions collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -1669,7 +1669,7 @@ func (c *collector) getVacuumProgress() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `SELECT datname, COALESCE(relid, 0), COALESCE(phase, ''),
q := `SELECT pid, datname, COALESCE(relid, 0), COALESCE(phase, ''),
COALESCE(heap_blks_total, 0), COALESCE(heap_blks_scanned, 0),
COALESCE(heap_blks_vacuumed, 0), COALESCE(index_vacuum_count, 0),
COALESCE(max_dead_tuples, 0), COALESCE(num_dead_tuples, 0)
Expand All @@ -1683,7 +1683,7 @@ func (c *collector) getVacuumProgress() {

for rows.Next() {
var p pgmetrics.VacuumProgressBackend
if err := rows.Scan(&p.DBName, &p.TableOID, &p.Phase, &p.HeapBlksTotal,
if err := rows.Scan(&p.PID, &p.DBName, &p.TableOID, &p.Phase, &p.HeapBlksTotal,
&p.HeapBlksScanned, &p.HeapBlksVacuumed, &p.IndexVacuumCount,
&p.MaxDeadTuples, &p.NumDeadTuples); err != nil {
log.Fatalf("pg_stat_progress_vacuum query failed: %v", err)
Expand Down Expand Up @@ -2460,7 +2460,7 @@ func (c *collector) getProgressAnalyze() {
var out []pgmetrics.AnalyzeProgressBackend
for rows.Next() {
var r pgmetrics.AnalyzeProgressBackend
if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Phase,
if err := rows.Scan(&r.PID, &r.DBName, &r.TableOID, &r.Phase,
&r.SampleBlocksTotal, &r.SampleBlocksScanned, &r.ExtStatsTotal,
&r.ExtStatsComputed, &r.ChildTablesTotal, &r.ChildTablesDone,
&r.CurrentChildTableRelOID); err != nil {
Expand Down Expand Up @@ -2535,7 +2535,7 @@ func (c *collector) getProgressCluster() {
var out []pgmetrics.ClusterProgressBackend
for rows.Next() {
var r pgmetrics.ClusterProgressBackend
if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Command, &r.Phase,
if err := rows.Scan(&r.PID, &r.DBName, &r.TableOID, &r.Command, &r.Phase,
&r.ClusterIndexOID, &r.HeapTuplesScanned, &r.HeapTuplesWritten,
&r.HeapBlksTotal, &r.HeapBlksScanned, &r.IndexRebuildCount); err != nil {
log.Fatalf("pg_stat_progress_cluster query scan failed: %v", err)
Expand Down Expand Up @@ -2572,7 +2572,7 @@ func (c *collector) getProgressCopy() {
var out []pgmetrics.CopyProgressBackend
for rows.Next() {
var r pgmetrics.CopyProgressBackend
if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Command, &r.Type,
if err := rows.Scan(&r.PID, &r.DBName, &r.TableOID, &r.Command, &r.Type,
&r.BytesProcessed, &r.BytesTotal, &r.TuplesProcessed,
&r.TuplesExcluded); err != nil {
log.Fatalf("pg_stat_progress_copy query scan failed: %v", err)
Expand Down
19 changes: 16 additions & 3 deletions model.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,17 @@ func (m *Model) IndexByName(db, schema, index string) *Index {
return nil
}

// IndexByOID iterates over the indexes in the model and returns the reference
// to an Index that has the given oid. If there is no such index, it returns nil.
func (m *Model) IndexByOID(oid int) *Index {
for i, idx := range m.Indexes {
if idx.OID == oid {
return &m.Indexes[i]
}
}
return nil
}

// Metadata contains information about how to interpret the other fields in
// "Model" data structure.
type Metadata struct {
Expand Down Expand Up @@ -457,6 +468,8 @@ type VacuumProgressBackend struct {
IndexVacuumCount int64 `json:"index_vacuum_count"`
MaxDeadTuples int64 `json:"max_dead_tuples"`
NumDeadTuples int64 `json:"num_dead_tuples"`
// following fields present only in schema 1.12 and later
PID int `json:"pid,omitempty"`
}

type Extension struct {
Expand Down Expand Up @@ -828,7 +841,7 @@ type Azure struct {
type AnalyzeProgressBackend struct {
PID int `json:"pid"`
DBName string `json:"db_name"`
RelOID int `json:"rel_oid"`
TableOID int `json:"table_oid"`
Phase string `json:"phase"`
SampleBlocksTotal int64 `json:"sample_blks_total"`
SampleBlocksScanned int64 `json:"sample_blks_scanned"`
Expand Down Expand Up @@ -859,7 +872,7 @@ type BasebackupProgressBackend struct {
type ClusterProgressBackend struct {
PID int `json:"pid"`
DBName string `json:"db_name"`
RelOID int `json:"rel_oid"`
TableOID int `json:"table_oid"`
Command string `json:"command"`
Phase string `json:"phase"`
ClusterIndexOID int `json:"cluser_index_oid"`
Expand All @@ -877,7 +890,7 @@ type ClusterProgressBackend struct {
type CopyProgressBackend struct {
PID int `json:"pid"`
DBName string `json:"db_name"`
RelOID int `json:"rel_oid"`
TableOID int `json:"table_oid"`
Command string `json:"command"`
Type string `json:"type"`
BytesProcessed int64 `json:"bytes_processed"`
Expand Down

0 comments on commit 5fc59d0

Please sign in to comment.