Skip to content

Commit

Permalink
Handle new pg_stat_statements column names (#874)
Browse files Browse the repository at this point in the history
Update pg_stat_statements collector to handle the new column names in
PostgreSQL 13.

Fixes: #502

Signed-off-by: SuperQ <superq@gmail.com>
  • Loading branch information
SuperQ authored Jul 25, 2023
1 parent 74800f4 commit f9277b0
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
30 changes: 28 additions & 2 deletions collector/pg_stat_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"

"github.com/blang/semver/v4"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -90,12 +91,37 @@ var (
)
ORDER BY seconds_total DESC
LIMIT 100;`

pgStatStatementsNewQuery = `SELECT
pg_get_userbyid(userid) as user,
pg_database.datname,
pg_stat_statements.queryid,
pg_stat_statements.calls as calls_total,
pg_stat_statements.total_exec_time / 1000.0 as seconds_total,
pg_stat_statements.rows as rows_total,
pg_stat_statements.blk_read_time / 1000.0 as block_read_seconds_total,
pg_stat_statements.blk_write_time / 1000.0 as block_write_seconds_total
FROM pg_stat_statements
JOIN pg_database
ON pg_database.oid = pg_stat_statements.dbid
WHERE
total_time > (
SELECT percentile_cont(0.1)
WITHIN GROUP (ORDER BY total_time)
FROM pg_stat_statements
)
ORDER BY seconds_total DESC
LIMIT 100;`
)

func (PGStatStatementsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
query := pgStatStatementsQuery
if instance.version.GE(semver.MustParse("13.0.0")) {
query = pgStatStatementsNewQuery
}

db := instance.getDB()
rows, err := db.QueryContext(ctx,
pgStatStatementsQuery)
rows, err := db.QueryContext(ctx, query)

if err != nil {
return err
Expand Down
50 changes: 47 additions & 3 deletions collector/pg_stat_statements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/blang/semver/v4"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
Expand All @@ -29,7 +30,7 @@ func TestPGStateStatementsCollector(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &instance{db: db, version: semver.MustParse("12.0.0")}

columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
Expand Down Expand Up @@ -72,12 +73,12 @@ func TestPGStateStatementsCollectorNull(t *testing.T) {
}
defer db.Close()

inst := &instance{db: db}
inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, nil, nil, nil, nil)
mock.ExpectQuery(sanitizeQuery(pgStatStatementsQuery)).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down Expand Up @@ -107,3 +108,46 @@ func TestPGStateStatementsCollectorNull(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPGStateStatementsCollectorNewPG(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"user", "datname", "queryid", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, 5, 0.4, 100, 0.1, 0.2)
mock.ExpectQuery(sanitizeQuery(pgStatStatementsNewQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 5},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.4},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 100},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.1},
{labels: labelMap{"user": "postgres", "datname": "postgres", "queryid": "1500"}, metricType: dto.MetricType_COUNTER, value: 0.2},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

0 comments on commit f9277b0

Please sign in to comment.