Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix: Make collector not fail on null values #823

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions collector/pg_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -79,32 +80,41 @@ func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch
var databases []string

for rows.Next() {
var datname string
var datname sql.NullString
SuperQ marked this conversation as resolved.
Show resolved Hide resolved
if err := rows.Scan(&datname); err != nil {
return err
}

if !datname.Valid {
continue
}
// Ignore excluded databases
// Filtering is done here instead of in the query to avoid
// a complicated NOT IN query with a variable number of parameters
if sliceContains(c.excludedDatabases, datname) {
if sliceContains(c.excludedDatabases, datname.String) {
continue
}

databases = append(databases, datname)
databases = append(databases, datname.String)
}

// Query the size of the databases
for _, datname := range databases {
var size int64
var size sql.NullFloat64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also don't see any documentation that says that this field can be NULL>

err = db.QueryRowContext(ctx, pgDatabaseSizeQuery, datname).Scan(&size)
if err != nil {
return err
}

var sizeMetric float64
if size.Valid {
sizeMetric = size.Float64
} else {
sizeMetric = 0
}
Sticksman marked this conversation as resolved.
Show resolved Hide resolved
ch <- prometheus.MustNewConstMetric(
pgDatabaseSizeDesc,
prometheus.GaugeValue, float64(size), datname,
prometheus.GaugeValue, sizeMetric, datname,
)
}
if err := rows.Err(); err != nil {
Expand Down
40 changes: 40 additions & 0 deletions collector/pg_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,43 @@ func TestPGDatabaseCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

// TODO add a null db test

func TestPGDatabaseCollectorNullMetric(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}

mock.ExpectQuery(sanitizeQuery(pgDatabaseQuery)).WillReturnRows(sqlmock.NewRows([]string{"datname"}).
AddRow("postgres"))

mock.ExpectQuery(sanitizeQuery(pgDatabaseSizeQuery)).WithArgs("postgres").WillReturnRows(sqlmock.NewRows([]string{"pg_database_size"}).
AddRow(nil))

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGDatabaseCollector{}
if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGDatabaseCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"datname": "postgres"}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
9 changes: 7 additions & 2 deletions collector/pg_postmaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collector

import (
"context"
"database/sql"

"github.com/prometheus/client_golang/prometheus"
)
Expand Down Expand Up @@ -51,14 +52,18 @@ func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance,
row := db.QueryRowContext(ctx,
pgPostmasterQuery)

var startTimeSeconds float64
var startTimeSeconds sql.NullFloat64
err := row.Scan(&startTimeSeconds)
if err != nil {
return err
}
startTimeSecondsMetric := 0.0
if startTimeSeconds.Valid {
startTimeSecondsMetric = startTimeSeconds.Float64
}
ch <- prometheus.MustNewConstMetric(
pgPostMasterStartTimeSeconds,
prometheus.GaugeValue, startTimeSeconds,
prometheus.GaugeValue, startTimeSecondsMetric,
)
return nil
}
36 changes: 36 additions & 0 deletions collector/pg_postmaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,39 @@ func TestPgPostmasterCollector(t *testing.T) {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgPostmasterCollectorNullTime(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}

mock.ExpectQuery(sanitizeQuery(pgPostmasterQuery)).WillReturnRows(sqlmock.NewRows([]string{"pg_postmaster_start_time"}).
AddRow(nil))

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGPostmasterCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGPostmasterCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{}, value: 0, metricType: dto.MetricType_GAUGE},
}
convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
36 changes: 26 additions & 10 deletions collector/pg_process_idle.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,23 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
)

const processIdleSubsystem = "process_idle"

func init() {
registerCollector(processIdleSubsystem, defaultEnabled, NewPGProcessIdleCollector)
// Making this default disabled because we have no tests for it
registerCollector("statements", defaultDisabled, NewPGProcessIdleCollector)
Sticksman marked this conversation as resolved.
Show resolved Hide resolved
}

type PGProcessIdleCollector struct {
log log.Logger
}

const processIdleSubsystem = "process_idle"

func NewPGProcessIdleCollector(config collectorConfig) (Collector, error) {
return &PGProcessIdleCollector{log: config.logger}, nil
}
Expand All @@ -41,8 +43,8 @@ var pgProcessIdleSeconds = prometheus.NewDesc(
prometheus.Labels{},
)

func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
func (PGProcessIdleCollector) Update(ctx context.Context, inst *instance, ch chan<- prometheus.Metric) error {
db := inst.getDB()
row := db.QueryRowContext(ctx,
`WITH
metrics AS (
Expand Down Expand Up @@ -79,9 +81,9 @@ func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch
FROM metrics JOIN buckets USING (application_name)
GROUP BY 1, 2, 3;`)

var applicationName string
var secondsSum int64
var secondsCount uint64
var applicationName sql.NullString
var secondsSum sql.NullInt64
var secondsCount sql.NullInt64
var seconds []int64
var secondsBucket []uint64

Expand All @@ -97,10 +99,24 @@ func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch
if err != nil {
return err
}

applicationNameLabel := "unknown"
if applicationName.Valid {
applicationNameLabel = applicationName.String
}

var secondsCountMetric uint64
if secondsCount.Valid {
secondsCountMetric = uint64(secondsCount.Int64)
}
var secondsSumMetric float64
if secondsSum.Valid {
secondsSumMetric = float64(secondsSum.Int64)
}
ch <- prometheus.MustNewConstHistogram(
pgProcessIdleSeconds,
secondsCount, float64(secondsSum), buckets,
applicationName,
secondsCountMetric, secondsSumMetric, buckets,
applicationNameLabel,
)
return nil
}
31 changes: 22 additions & 9 deletions collector/pg_replication_slot.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package collector

import (
"context"
"database/sql"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -82,32 +83,44 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
defer rows.Close()

for rows.Next() {
var slotName string
var walLSN int64
var flushLSN int64
var isActive bool
var slotName sql.NullString
var walLSN sql.NullFloat64
var flushLSN sql.NullFloat64
var isActive sql.NullBool
if err := rows.Scan(&slotName, &walLSN, &flushLSN, &isActive); err != nil {
return err
}

isActiveValue := 0
if isActive {
if isActive.Valid && isActive.Bool {
isActiveValue = 1
SuperQ marked this conversation as resolved.
Show resolved Hide resolved
}
slotNameLabel := "unknown"
if slotName.Valid {
slotNameLabel = slotName.String
}

var walLSNMetric float64
if walLSN.Valid {
walLSNMetric = walLSN.Float64
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotCurrentWalDesc,
prometheus.GaugeValue, float64(walLSN), slotName,
prometheus.GaugeValue, walLSNMetric, slotNameLabel,
)
if isActive {
if isActive.Valid && isActive.Bool {
var flushLSNMetric float64
if flushLSN.Valid {
flushLSNMetric = flushLSN.Float64
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotCurrentFlushDesc,
prometheus.GaugeValue, float64(flushLSN), slotName,
prometheus.GaugeValue, flushLSNMetric, slotNameLabel,
)
}
ch <- prometheus.MustNewConstMetric(
pgReplicationSlotIsActiveDesc,
prometheus.GaugeValue, float64(isActiveValue), slotName,
prometheus.GaugeValue, float64(isActiveValue), slotNameLabel,
)
}
if err := rows.Err(); err != nil {
Expand Down
81 changes: 81 additions & 0 deletions collector/pg_replication_slot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,84 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
}

}

func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}

columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns).
AddRow("test_slot", 6, 12, nil)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGReplicationSlotCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"slot_name": "test_slot"}, value: 6, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "test_slot"}, value: 0, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

inst := &instance{db: db}

columns := []string{"slot_name", "current_wal_lsn", "confirmed_flush_lsn", "active"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, true)
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGReplicationSlotCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationSlotCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "unknown"}, value: 0, metricType: dto.MetricType_GAUGE},
{labels: labelMap{"slot_name": "unknown"}, value: 1, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}
Loading