Skip to content

Commit

Permalink
migrate postgres to ReporterV2 with new error handling (elastic#11636)
Browse files Browse the repository at this point in the history
  • Loading branch information
fearful-symmetry authored Apr 4, 2019
1 parent 4e09a44 commit 2ec788f
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 91 deletions.
16 changes: 5 additions & 11 deletions metricbeat/module/postgresql/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ import (

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.activity")

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
Expand All @@ -56,21 +53,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_activity")
if err != nil {
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in QueryStats")
}

for _, result := range results {
Expand All @@ -79,4 +71,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
MetricSetFields: data,
})
}

return nil
}
14 changes: 4 additions & 10 deletions metricbeat/module/postgresql/activity/activity_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand All @@ -59,14 +59,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
22 changes: 6 additions & 16 deletions metricbeat/module/postgresql/bgwriter/bgwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"database/sql"
"fmt"

"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
Expand All @@ -32,8 +30,6 @@ import (
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.bgwriter")

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
Expand All @@ -56,31 +52,25 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_bgwriter")
if err != nil {
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in QueryStats")
}
if len(results) == 0 {
err = fmt.Errorf("No results from the pg_stat_bgwriter query")
logger.Error(err)
reporter.Error(err)
return
return fmt.Errorf("No results from the pg_stat_bgwriter query")
}

data, _ := schema.Apply(results[0])
reporter.Event(mb.Event{
MetricSetFields: data,
})

return nil
}
14 changes: 4 additions & 10 deletions metricbeat/module/postgresql/bgwriter/bgwriter_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down Expand Up @@ -64,14 +64,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
17 changes: 5 additions & 12 deletions metricbeat/module/postgresql/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package database
import (
"database/sql"

"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
Expand All @@ -31,8 +29,6 @@ import (
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.database")

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
Expand All @@ -55,21 +51,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_database")
if err != nil {
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in QueryStats")
}

for _, result := range results {
Expand All @@ -78,4 +69,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
MetricSetFields: data,
})
}

return nil
}
14 changes: 4 additions & 10 deletions metricbeat/module/postgresql/database/database_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand All @@ -61,14 +61,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func init() {
}
}

//NewModule returns a new instance of the module
func NewModule(base mb.BaseModule) (mb.Module, error) {
// Validate that at least one host has been specified.
config := struct {
Expand All @@ -54,6 +55,7 @@ func NewModule(base mb.BaseModule) (mb.Module, error) {
return &base, nil
}

//ParseURL is the postgres host parser
func ParseURL(mod mb.Module, rawURL string) (mb.HostData, error) {
c := struct {
Username string `config:"username"`
Expand Down Expand Up @@ -102,6 +104,7 @@ func ParseURL(mod mb.Module, rawURL string) (mb.HostData, error) {
return h, nil
}

//QueryStats makes the database call for a given metric
func QueryStats(db *sql.DB, query string) ([]map[string]interface{}, error) {
rows, err := db.Query(query)
if err != nil {
Expand Down
17 changes: 5 additions & 12 deletions metricbeat/module/postgresql/statement/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package statement
import (
"database/sql"

"github.com/elastic/beats/libbeat/logp"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
Expand All @@ -31,8 +29,6 @@ import (
_ "github.com/lib/pq"
)

var logger = logp.NewLogger("postgresql.statement")

// init registers the MetricSet with the central registry as soon as the program
// starts. The New function will be called later to instantiate an instance of
// the MetricSet for each host defined in the module's configuration. After the
Expand Down Expand Up @@ -67,21 +63,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_statements")
if err != nil {
err = errors.Wrap(err, "QueryStats")
logger.Error(err)
reporter.Error(err)
return
return errors.Wrap(err, "QueryStats")
}

for _, result := range results {
Expand All @@ -90,4 +81,6 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) {
MetricSetFields: data,
})
}

return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
func TestFetch(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
Expand Down Expand Up @@ -92,14 +92,8 @@ func TestFetch(t *testing.T) {
func TestData(t *testing.T) {
compose.EnsureUp(t, "postgresql")

f := mbtest.NewReportingMetricSetV2(t, getConfig())
events, errs := mbtest.ReportingFetchV2(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)

if err := mbtest.WriteEventsReporterV2(f, t, ""); err != nil {
f := mbtest.NewReportingMetricSetV2Error(t, getConfig())
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
}
}
Expand Down
3 changes: 3 additions & 0 deletions metricbeat/module/postgresql/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@ package postgresql

import "os"

//GetEnvDSN returns the Data Source Name
func GetEnvDSN() string {
return os.Getenv("POSTGRESQL_DSN")
}

//GetEnvUsername returns the username
func GetEnvUsername() string {
return os.Getenv("POSTGRESQL_USERNAME")
}

//GetEnvPassword returns the password
func GetEnvPassword() string {
return os.Getenv("POSTGRESQL_PASSWORD")
}

0 comments on commit 2ec788f

Please sign in to comment.