diff --git a/collector/pg_stat_wal.go b/collector/pg_stat_wal.go new file mode 100644 index 000000000..b4eb45dba --- /dev/null +++ b/collector/pg_stat_wal.go @@ -0,0 +1,240 @@ +// Copyright 2024 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "strings" + + "github.com/prometheus/client_golang/prometheus" +) + +const statWALSubsystem = "stat_wal" + +func init() { + registerCollector(statWALSubsystem, defaultDisabled, NewPGStatWALCollector) +} + +type PGStatWALCollector struct { + log *slog.Logger +} + +func NewPGStatWALCollector(config collectorConfig) (Collector, error) { + return &PGStatWALCollector{log: config.logger}, nil +} + +var statsWALRecordsDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "wal_records_total"), + "Total number of WAL records generated", + []string{}, + prometheus.Labels{}, +) + +var statsWALFPIDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "wal_fpi"), + "Total number of WAL full page images generated", + []string{}, + prometheus.Labels{}, +) + +var statsWALBytesDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "wal_bytes"), + "Total amount of WAL generated in bytes", + []string{}, + prometheus.Labels{}, +) + +var statsWALBuffersFullDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "wal_buffers_full"), + "Number of times WAL data was written to disk because WAL buffers became full", + []string{}, + prometheus.Labels{}, +) + +var statsWALWriteDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "wal_write"), + "Number of times WAL buffers were written out to disk via XLogWrite request. See Section 30.5 for more information about the internal WAL function XLogWrite.", + []string{}, + prometheus.Labels{}, +) + +var statsWALSyncDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "wal_sync"), + "Number of times WAL files were synced to disk via issue_xlog_fsync request (if fsync is on and wal_sync_method is either fdatasync, fsync or fsync_writethrough, otherwise zero). See Section 30.5 for more information about the internal WAL function issue_xlog_fsync.", + []string{}, + prometheus.Labels{}, +) + +var statsWALWriteTimeDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "wal_write_time"), + "Total amount of time spent writing WAL buffers to disk via XLogWrite request, in milliseconds (if track_wal_io_timing is enabled, otherwise zero). This includes the sync time when wal_sync_method is either open_datasync or open_sync.", + []string{}, + prometheus.Labels{}, +) + +var statsWALSyncTimeDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "wal_sync_time"), + "Total amount of time spent syncing WAL files to disk via issue_xlog_fsync request, in milliseconds (if track_wal_io_timing is enabled, fsync is on, and wal_sync_method is either fdatasync, fsync or fsync_writethrough, otherwise zero).", + []string{}, + prometheus.Labels{}, +) + +var statsWALStatsResetDesc = prometheus.NewDesc( + prometheus.BuildFQName(namespace, statWALSubsystem, "stats_reset"), + "Time at which these statistics were last reset", + []string{}, + prometheus.Labels{}, +) + +func statWALQuery(columns []string) string { + return fmt.Sprintf("SELECT %s FROM pg_stat_wal;", strings.Join(columns, ",")) +} + +func (c *PGStatWALCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error { + db := instance.getDB() + + columns := []string{ + "wal_records", // bigint + "wal_fpi", // bigint + "wal_bytes", // numeric + "wal_buffers_full", // bigint + "wal_write", // bigint + "wal_sync", // bigint + "wal_write_time", // double precision + "wal_sync_time", // double precision + "stats_reset", // timestamp with time zone + } + + rows, err := db.QueryContext(ctx, + statWALQuery(columns), + ) + if err != nil { + return err + } + defer rows.Close() + + for rows.Next() { + var walRecords, walFPI, walBytes, walBuffersFull, walWrite, walSync sql.NullInt64 + var walWriteTime, walSyncTime sql.NullFloat64 + var statsReset sql.NullTime + + err := rows.Scan( + &walRecords, + &walFPI, + &walBytes, + &walBuffersFull, + &walWrite, + &walSync, + &walWriteTime, + &walSyncTime, + &statsReset, + ) + if err != nil { + return err + } + + walRecordsMetric := 0.0 + if walRecords.Valid { + walRecordsMetric = float64(walRecords.Int64) + } + ch <- prometheus.MustNewConstMetric( + statsWALRecordsDesc, + prometheus.CounterValue, + walRecordsMetric, + ) + + walFPIMetric := 0.0 + if walFPI.Valid { + walFPIMetric = float64(walFPI.Int64) + } + ch <- prometheus.MustNewConstMetric( + statsWALFPIDesc, + prometheus.CounterValue, + walFPIMetric, + ) + + walBytesMetric := 0.0 + if walBytes.Valid { + walBytesMetric = float64(walBytes.Int64) + } + ch <- prometheus.MustNewConstMetric( + statsWALBytesDesc, + prometheus.CounterValue, + walBytesMetric, + ) + + walBuffersFullMetric := 0.0 + if walBuffersFull.Valid { + walBuffersFullMetric = float64(walBuffersFull.Int64) + } + ch <- prometheus.MustNewConstMetric( + statsWALBuffersFullDesc, + prometheus.CounterValue, + walBuffersFullMetric, + ) + + walWriteMetric := 0.0 + if walWrite.Valid { + walWriteMetric = float64(walWrite.Int64) + } + ch <- prometheus.MustNewConstMetric( + statsWALWriteDesc, + prometheus.CounterValue, + walWriteMetric, + ) + + walSyncMetric := 0.0 + if walSync.Valid { + walSyncMetric = float64(walSync.Int64) + } + ch <- prometheus.MustNewConstMetric( + statsWALSyncDesc, + prometheus.CounterValue, + walSyncMetric, + ) + + walWriteTimeMetric := 0.0 + if walWriteTime.Valid { + walWriteTimeMetric = float64(walWriteTime.Float64) + } + ch <- prometheus.MustNewConstMetric( + statsWALWriteTimeDesc, + prometheus.CounterValue, + walWriteTimeMetric, + ) + + walSyncTimeMetric := 0.0 + if walSyncTime.Valid { + walSyncTimeMetric = float64(walSyncTime.Float64) + } + ch <- prometheus.MustNewConstMetric( + statsWALSyncTimeDesc, + prometheus.CounterValue, + walSyncTimeMetric, + ) + + resetMetric := 0.0 + if statsReset.Valid { + resetMetric = float64(statsReset.Time.Unix()) + } + ch <- prometheus.MustNewConstMetric( + statsWALStatsResetDesc, + prometheus.CounterValue, + resetMetric, + ) + } + return nil +} diff --git a/collector/pg_stat_wal_test.go b/collector/pg_stat_wal_test.go new file mode 100644 index 000000000..79d1bf370 --- /dev/null +++ b/collector/pg_stat_wal_test.go @@ -0,0 +1,144 @@ +// Copyright 2023 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package collector + +import ( + "context" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" + "github.com/smartystreets/goconvey/convey" +) + +func TestPGStatWALCollector(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + + columns := []string{ + "wal_records", // bigint + "wal_fpi", // bigint + "wal_bytes", // numeric + "wal_buffers_full", // bigint + "wal_write", // bigint + "wal_sync", // bigint + "wal_write_time", // double precision + "wal_sync_time", // double precision + "stats_reset", // timestamp with time zone + } + + srT, err := time.Parse("2006-01-02 15:04:05.00000-07", "2023-05-25 17:10:42.81132-07") + if err != nil { + t.Fatalf("Error parsing time: %s", err) + } + + rows := sqlmock.NewRows(columns). + AddRow(354, 4945, 289097744, 1242257, int64(3275602074), 89320867, 450.123439, 1234.5678, srT) + mock.ExpectQuery(sanitizeQuery(statWALQuery(columns))).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWALCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatWALCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 354}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 4945}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 289097744}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 1242257}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 3275602074}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 89320867}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 450.123439}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 1234.5678}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 1685059842}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +} + +func TestPGStatWALCollectorNullValues(t *testing.T) { + db, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("Error opening a stub db connection: %s", err) + } + defer db.Close() + + inst := &instance{db: db} + columns := []string{ + "wal_records", // bigint + "wal_fpi", // bigint + "wal_bytes", // numeric + "wal_buffers_full", // bigint + "wal_write", // bigint + "wal_sync", // bigint + "wal_write_time", // double precision + "wal_sync_time", // double precision + "stats_reset", // timestamp with time zone + } + + rows := sqlmock.NewRows(columns). + AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil) + mock.ExpectQuery(sanitizeQuery(statWALQuery(columns))).WillReturnRows(rows) + + ch := make(chan prometheus.Metric) + go func() { + defer close(ch) + c := PGStatWALCollector{} + + if err := c.Update(context.Background(), inst, ch); err != nil { + t.Errorf("Error calling PGStatWALCollector.Update: %s", err) + } + }() + + expected := []MetricResult{ + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + {labels: labelMap{}, metricType: dto.MetricType_COUNTER, value: 0}, + } + + convey.Convey("Metrics comparison", t, func() { + for _, expect := range expected { + m := readMetric(<-ch) + convey.So(expect, convey.ShouldResemble, m) + } + }) + if err := mock.ExpectationsWereMet(); err != nil { + t.Errorf("there were unfulfilled exceptions: %s", err) + } +}