Skip to content

Commit

Permalink
[devel] Collect from various pg_stat_progress_* views.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdevan committed May 10, 2022
1 parent b4c5070 commit 68bc6bb
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 1 deletion.
207 changes: 207 additions & 0 deletions collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,19 @@ func (c *collector) collectCluster(o CollectConfig) {
c.getWAL()
}

// various pg_stat_progress_* views
if c.version >= pgv12 {
c.getProgressCluster()
c.getProgressCreateIndex()
}
if c.version >= pgv13 {
c.getProgressAnalyze()
c.getProgressBasebackup()
}
if c.version >= pgv14 {
c.getProgressCopy()
}

if !arrayHas(o.Omit, "log") && c.local {
c.getLogInfo()
}
Expand Down Expand Up @@ -2417,6 +2430,200 @@ func (c *collector) getWAL() {
c.result.WAL = &w
}

func (c *collector) getProgressAnalyze() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `SELECT pid, datname, relid::int, COALESCE(phase, ''),
COALESCE(sample_blks_total, 0::bigint),
COALESCE(sample_blks_scanned, 0::bigint),
COALESCE(ext_stats_total, 0::bigint),
COALESCE(ext_stats_computed, 0::bigint),
COALESCE(child_tables_total, 0::bigint),
COALESCE(child_tables_done, 0::bigint),
COALESCE(current_child_table_relid::int, 0::int)
FROM pg_stat_progress_analyze
ORDER BY pid ASC`

rows, err := c.db.QueryContext(ctx, q)
if err != nil {
log.Printf("warning: pg_stat_progress_analyze query failed: %v", err)
return
}
defer rows.Close()

var out []pgmetrics.AnalyzeProgressBackend
for rows.Next() {
var r pgmetrics.AnalyzeProgressBackend
if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Phase,
&r.SampleBlocksTotal, &r.SampleBlocksScanned, &r.ExtStatsTotal,
&r.ExtStatsComputed, &r.ChildTablesTotal, &r.ChildTablesDone,
&r.CurrentChildTableRelOID); err != nil {
log.Fatalf("pg_stat_progress_analyze query scan failed: %v", err)
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Fatalf("pg_stat_progress_analyze query rows failed: %v", err)
}

c.result.AnalyzeProgress = out
}

func (c *collector) getProgressBasebackup() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `SELECT pid, COALESCE(phase, ''),
COALESCE(backup_total, 0::bigint),
COALESCE(backup_streamed, 0::bigint),
COALESCE(tablespaces_total, 0::bigint),
COALESCE(tablespaces_streamed, 0::bigint)
FROM pg_stat_progress_basebackup
ORDER BY pid ASC`

rows, err := c.db.QueryContext(ctx, q)
if err != nil {
log.Printf("warning: pg_stat_progress_basebackup query failed: %v", err)
return
}
defer rows.Close()

var out []pgmetrics.BasebackupProgressBackend
for rows.Next() {
var r pgmetrics.BasebackupProgressBackend
if err := rows.Scan(&r.PID, &r.Phase, &r.BackupTotal, &r.BackupStreamed,
&r.TablespacesTotal, &r.TablespacesStreamed); err != nil {
log.Fatalf("pg_stat_progress_basebackup query scan failed: %v", err)
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Fatalf("pg_stat_progress_basebackup query rows failed: %v", err)
}

c.result.BasebackupProgress = out
}

func (c *collector) getProgressCluster() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `SELECT pid, datname, relid::int, COALESCE(command, ''),
COALESCE(phase, ''),
COALESCE(cluster_index_relid::int, 0),
COALESCE(heap_tuples_scanned, 0::bigint),
COALESCE(heap_tuples_written, 0::bigint),
COALESCE(heap_blks_total, 0::bigint),
COALESCE(heap_blks_scanned, 0::bigint),
COALESCE(index_rebuild_count::int, 0::int)
FROM pg_stat_progress_cluster
ORDER BY pid ASC`

rows, err := c.db.QueryContext(ctx, q)
if err != nil {
log.Printf("warning: pg_stat_progress_cluster query failed: %v", err)
return
}
defer rows.Close()

var out []pgmetrics.ClusterProgressBackend
for rows.Next() {
var r pgmetrics.ClusterProgressBackend
if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Command, &r.Phase,
&r.ClusterIndexOID, &r.HeapTuplesScanned, &r.HeapTuplesWritten,
&r.HeapBlksTotal, &r.HeapBlksScanned, &r.IndexRebuildCount); err != nil {
log.Fatalf("pg_stat_progress_cluster query scan failed: %v", err)
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Fatalf("pg_stat_progress_cluster query rows failed: %v", err)
}

c.result.ClusterProgress = out
}

func (c *collector) getProgressCopy() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `SELECT pid, datname, relid::int, COALESCE(command, ''),
COALESCE(type, ''),
COALESCE(bytes_processed, 0::bigint),
COALESCE(bytes_total, 0::bigint),
COALESCE(tuples_processed, 0::bigint),
COALESCE(tuples_excluded, 0::bigint)
FROM pg_stat_progress_copy
ORDER BY pid ASC`

rows, err := c.db.QueryContext(ctx, q)
if err != nil {
log.Printf("warning: pg_stat_progress_copy query failed: %v", err)
return
}
defer rows.Close()

var out []pgmetrics.CopyProgressBackend
for rows.Next() {
var r pgmetrics.CopyProgressBackend
if err := rows.Scan(&r.PID, &r.DBName, &r.RelOID, &r.Command, &r.Type,
&r.BytesProcessed, &r.BytesTotal, &r.TuplesProcessed,
&r.TuplesExcluded); err != nil {
log.Fatalf("pg_stat_progress_copy query scan failed: %v", err)
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Fatalf("pg_stat_progress_copy query rows failed: %v", err)
}

c.result.CopyProgress = out
}

func (c *collector) getProgressCreateIndex() {
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

q := `SELECT pid, datname, relid::int, index_relid::int, command, phase,
COALESCE(lockers_total, 0::bigint),
COALESCE(lockers_done, 0::bigint),
COALESCE(current_locker_pid::int, 0::int),
COALESCE(blocks_total, 0::bigint),
COALESCE(blocks_done, 0::bigint),
COALESCE(tuples_total, 0::bigint),
COALESCE(tuples_done, 0::bigint),
COALESCE(partitions_total, 0::bigint),
COALESCE(partitions_done, 0::bigint)
FROM pg_stat_progress_create_index
ORDER BY pid ASC`

rows, err := c.db.QueryContext(ctx, q)
if err != nil {
log.Printf("warning: pg_stat_progress_create_index query failed: %v", err)
return
}
defer rows.Close()

var out []pgmetrics.CreateIndexProgressBackend
for rows.Next() {
var r pgmetrics.CreateIndexProgressBackend
if err := rows.Scan(&r.PID, &r.DBName, &r.TableOID, &r.IndexOID,
&r.Command, &r.Phase, &r.LockersTotal, &r.LockersDone,
&r.CurrentLockerPID, &r.BlocksTotal, &r.BlocksDone,
&r.TuplesTotal, &r.TuplesDone, &r.PartitionsTotal,
&r.PartitionsDone); err != nil {
log.Fatalf("pg_stat_progress_create_index query scan failed: %v", err)
}
out = append(out, r)
}
if err := rows.Err(); err != nil {
log.Fatalf("pg_stat_progress_create_index query rows failed: %v", err)
}

c.result.CreateIndexProgress = out
}

//------------------------------------------------------------------------------
// PgBouncer

Expand Down
96 changes: 95 additions & 1 deletion model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package pgmetrics

// ModelSchemaVersion is the schema version of the "Model" data structure
// defined below. It is in the "semver" notation. Version history:
// 1.12 - Azure metrics, queryid in plan
// 1.12 - Azure metrics, queryid in plan, progress views
// 1.11 - Postgres 14, PgBouncer 1.16, other attributes
// 1.10 - New fields in pg_stat_statements for Postgres 13
// 1.9 - Postgres 13, Citus support
Expand Down Expand Up @@ -163,6 +163,13 @@ type Model struct {

// metrics from Azure PostgreSQL, via Azure Monitor APIs
Azure *Azure `json:"azure,omitempty"`

// progress information from pg_stat_progress_* (see above for vacuum)
AnalyzeProgress []AnalyzeProgressBackend `json:"analyze_progress,omitempty"`
BasebackupProgress []BasebackupProgressBackend `json:"basebackup_progress,omitempty"`
ClusterProgress []ClusterProgressBackend `json:"cluster_progress,omitempty"`
CopyProgress []CopyProgressBackend `json:"copy_progress,omitempty"`
CreateIndexProgress []CreateIndexProgressBackend `json:"create_index_progress,omitempty"`
}

// DatabaseByOID iterates over the databases in the model and returns the reference
Expand Down Expand Up @@ -813,3 +820,90 @@ type Azure struct {
ResourceRegion string `json:"resource_region"`
Metrics map[string]float64 `json:"metrics"`
}

// AnalyzeProgressBackend represents a row (and each row represents one
// backend) from pg_stat_progress_analyze.
//
// pg >= 13, schema >= 1.12, pgmetrics >= 1.13.0
type AnalyzeProgressBackend struct {
PID int `json:"pid"`
DBName string `json:"db_name"`
RelOID int `json:"rel_oid"`
Phase string `json:"phase"`
SampleBlocksTotal int64 `json:"sample_blks_total"`
SampleBlocksScanned int64 `json:"sample_blks_scanned"`
ExtStatsTotal int64 `json:"ext_stats_total"`
ExtStatsComputed int64 `json:"ext_stats_computed"`
ChildTablesTotal int64 `json:"child_tables_total"`
ChildTablesDone int64 `json:"child_tables_done"`
CurrentChildTableRelOID int `json:"child_oid"`
}

// BasebackupProgressBackend represents a row (and each row represents one
// backend) from pg_stat_progress_basebackup.
//
// pg >= 13, schema >= 1.12, pgmetrics >= 1.13.0
type BasebackupProgressBackend struct {
PID int `json:"pid"`
Phase string `json:"phase"`
BackupTotal int64 `json:"backup_total"`
BackupStreamed int64 `json:"backup_streamed"`
TablespacesTotal int64 `json:"tablespaces_total"`
TablespacesStreamed int64 `json:"tablespaces_streamed"`
}

// ClusterProgressBackend represents a row (and each row represents one
// backend) from pg_stat_progress_cluster.
//
// pg >= 12, schema >= 1.12, pgmetrics >= 1.13.0
type ClusterProgressBackend struct {
PID int `json:"pid"`
DBName string `json:"db_name"`
RelOID int `json:"rel_oid"`
Command string `json:"command"`
Phase string `json:"phase"`
ClusterIndexOID int `json:"cluser_index_oid"`
HeapTuplesScanned int64 `json:"heap_tuples_scanned"`
HeapTuplesWritten int64 `json:"heap_tuples_written"`
HeapBlksTotal int64 `json:"heap_blks_total"`
HeapBlksScanned int64 `json:"heap_blks_scanned"`
IndexRebuildCount int `json:"index_rebuild_count"`
}

// CopyProgressBackend represents a row (and each row represents one
// backend) from pg_stat_progress_copy.
//
// pg >= 14, schema >= 1.12, pgmetrics >= 1.13.0
type CopyProgressBackend struct {
PID int `json:"pid"`
DBName string `json:"db_name"`
RelOID int `json:"rel_oid"`
Command string `json:"command"`
Type string `json:"type"`
BytesProcessed int64 `json:"bytes_processed"`
BytesTotal int64 `json:"bytes_total"`
TuplesProcessed int64 `json:"tuples_processed"`
TuplesExcluded int64 `json:"tuples_excluded"`
}

// CreateIndexProgressBackend represents a row (and each row represents one
// backend) from pg_stat_progress_create_index.
//
// pg >= 12, schema >= 1.12, pgmetrics >= 1.13.0
type CreateIndexProgressBackend struct {
PID int `json:"pid"`
DBName string `json:"db_name"`
TableOID int `json:"table_oid"`
IndexOID int `json:"index_oid"`
Command string `json:"command"`
Phase string `json:"phase"`
LockersTotal int64 `json:"lockers_total"`
LockersDone int64 `json:"lockers_done"`
CurrentLockerPID int `json:"current_locker_pid"`
BlocksTotal int64 `json:"blocks_total"`
BlocksDone int64 `json:"blocks_done"`
TuplesTotal int64 `json:"tuples_total"`
TuplesDone int64 `json:"tuples_done"`
PartitionsTotal int64 `json:"partitions_total"`
PartitionsDone int64 `json:"partitions_done"`
}

0 comments on commit 68bc6bb

Please sign in to comment.