From ee2b0828f14b5942a5c62065e927ff66ffa2b248 Mon Sep 17 00:00:00 2001 From: James Lawrence Date: Sat, 22 Apr 2017 08:38:35 -0400 Subject: [PATCH] make sql.DB connection details configurable --- CHANGELOG.md | 7 +- plugins/inputs/postgresql/connect.go | 77 ---------- plugins/inputs/postgresql/postgresql.go | 76 ++++------ plugins/inputs/postgresql/postgresql_test.go | 74 +++++---- plugins/inputs/postgresql/service.go | 142 ++++++++++++++++++ .../postgresql_extensible.go | 82 ++++------ .../postgresql_extensible_test.go | 26 ++-- 7 files changed, 277 insertions(+), 207 deletions(-) delete mode 100644 plugins/inputs/postgresql/connect.go create mode 100644 plugins/inputs/postgresql/service.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c2ddc9d27576c..be7cb74656ae2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -39,6 +39,10 @@ for receiving and sending UDP, TCP, unix, & unix-datagram data. These plugins will replace udp_listener and tcp_listener, which are still available but will be deprecated eventually. +- Postgresql plugins will now default to using a persistent connection to the database. +if you wish you retain the previous functionality set the max_lifetime configuration to less +than the collection interval. + ### Features - [#2494](https://github.com/influxdata/telegraf/pull/2494): Add interrupts input plugin. @@ -73,7 +77,8 @@ be deprecated eventually. - [#2493](https://github.com/influxdata/telegraf/pull/2493): Check signature in the GitHub webhook plugin - [#2038](https://github.com/influxdata/telegraf/issues/2038): Add papertrail support to webhooks - [#2253](https://github.com/influxdata/telegraf/pull/2253): Change jolokia plugin to use bulk requests. -- [#2575](https://github.com/influxdata/telegraf/issues/2575) Add diskio input for Darwin +- [#2575](https://github.com/influxdata/telegraf/issues/2575): Add diskio input for Darwin +- [#1977](https://github.com/influxdata/telegraf/issues/1977): make postgresql connection pool persist between intervals. ### Bugfixes diff --git a/plugins/inputs/postgresql/connect.go b/plugins/inputs/postgresql/connect.go deleted file mode 100644 index 011ae32e003b2..0000000000000 --- a/plugins/inputs/postgresql/connect.go +++ /dev/null @@ -1,77 +0,0 @@ -package postgresql - -import ( - "fmt" - "net" - "net/url" - "sort" - "strings" -) - -// pulled from lib/pq -// ParseURL no longer needs to be used by clients of this library since supplying a URL as a -// connection string to sql.Open() is now supported: -// -// sql.Open("postgres", "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full") -// -// It remains exported here for backwards-compatibility. -// -// ParseURL converts a url to a connection string for driver.Open. -// Example: -// -// "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full" -// -// converts to: -// -// "user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full" -// -// A minimal example: -// -// "postgres://" -// -// This will be blank, causing driver.Open to use all of the defaults -func ParseURL(uri string) (string, error) { - u, err := url.Parse(uri) - if err != nil { - return "", err - } - - if u.Scheme != "postgres" && u.Scheme != "postgresql" { - return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme) - } - - var kvs []string - escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`) - accrue := func(k, v string) { - if v != "" { - kvs = append(kvs, k+"="+escaper.Replace(v)) - } - } - - if u.User != nil { - v := u.User.Username() - accrue("user", v) - - v, _ = u.User.Password() - accrue("password", v) - } - - if host, port, err := net.SplitHostPort(u.Host); err != nil { - accrue("host", u.Host) - } else { - accrue("host", host) - accrue("port", port) - } - - if u.Path != "" { - accrue("dbname", u.Path[1:]) - } - - q := u.Query() - for k := range q { - accrue(k, q.Get(k)) - } - - sort.Strings(kvs) // Makes testing easier (not a performance concern) - return strings.Join(kvs, " "), nil -} diff --git a/plugins/inputs/postgresql/postgresql.go b/plugins/inputs/postgresql/postgresql.go index 832c433ed9fdf..274ebb42174f9 100644 --- a/plugins/inputs/postgresql/postgresql.go +++ b/plugins/inputs/postgresql/postgresql.go @@ -2,9 +2,7 @@ package postgresql import ( "bytes" - "database/sql" "fmt" - "regexp" "sort" "strings" @@ -12,16 +10,16 @@ import ( _ "github.com/jackc/pgx/stdlib" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" ) type Postgresql struct { - Address string + Service Databases []string IgnoredDatabases []string OrderedColumns []string AllColumns []string - sanitizedAddress string } var ignoredColumns = map[string]bool{"stats_reset": true} @@ -41,6 +39,15 @@ var sampleConfig = ` ## to grab metrics for. ## address = "host=localhost user=postgres sslmode=disable" + ## A custom name for the database that will be used as the "server" tag in the + ## measurement output. If not specified, a default one generated from + ## the connection address is used. + # outputaddress = "db01" + + ## connection configuration. + ## maxlifetime - specify the maximum lifetime of a connection. + ## default is forever (0s) + max_lifetime = "0s" ## A list of databases to explicitly ignore. If not specified, metrics for all ## databases are gathered. Do NOT use with the 'databases' option. @@ -63,23 +70,15 @@ func (p *Postgresql) IgnoredColumns() map[string]bool { return ignoredColumns } -var localhost = "host=localhost sslmode=disable" - func (p *Postgresql) Gather(acc telegraf.Accumulator) error { var ( err error - db *sql.DB query string ) - if p.Address == "" || p.Address == "localhost" { - p.Address = localhost - } - - if db, err = sql.Open("pgx", p.Address); err != nil { + if err = p.DB.Ping(); err != nil { return err } - defer db.Close() if len(p.Databases) == 0 && len(p.IgnoredDatabases) == 0 { query = `SELECT * FROM pg_stat_database` @@ -91,7 +90,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { strings.Join(p.Databases, "','")) } - rows, err := db.Query(query) + rows, err := p.DB.Query(query) if err != nil { return err } @@ -99,14 +98,13 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { defer rows.Close() // grab the column information from the result - p.OrderedColumns, err = rows.Columns() - if err != nil { + if p.OrderedColumns, err = rows.Columns(); err != nil { return err - } else { - p.AllColumns = make([]string, len(p.OrderedColumns)) - copy(p.AllColumns, p.OrderedColumns) } + p.AllColumns = make([]string, len(p.OrderedColumns)) + copy(p.AllColumns, p.OrderedColumns) + for rows.Next() { err = p.accRow(rows, acc) if err != nil { @@ -116,7 +114,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { query = `SELECT * FROM pg_stat_bgwriter` - bg_writer_row, err := db.Query(query) + bg_writer_row, err := p.DB.Query(query) if err != nil { return err } @@ -124,13 +122,12 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { defer bg_writer_row.Close() // grab the column information from the result - p.OrderedColumns, err = bg_writer_row.Columns() - if err != nil { + if p.OrderedColumns, err = bg_writer_row.Columns(); err != nil { return err - } else { - for _, v := range p.OrderedColumns { - p.AllColumns = append(p.AllColumns, v) - } + } + + for _, v := range p.OrderedColumns { + p.AllColumns = append(p.AllColumns, v) } for bg_writer_row.Next() { @@ -147,23 +144,6 @@ type scanner interface { Scan(dest ...interface{}) error } -var passwordKVMatcher, _ = regexp.Compile("password=\\S+ ?") - -func (p *Postgresql) SanitizedAddress() (_ string, err error) { - var canonicalizedAddress string - if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { - canonicalizedAddress, err = ParseURL(p.Address) - if err != nil { - return p.sanitizedAddress, err - } - } else { - canonicalizedAddress = p.Address - } - p.sanitizedAddress = passwordKVMatcher.ReplaceAllString(canonicalizedAddress, "") - - return p.sanitizedAddress, err -} - func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error { var columnVars []interface{} var dbname bytes.Buffer @@ -215,6 +195,14 @@ func (p *Postgresql) accRow(row scanner, acc telegraf.Accumulator) error { func init() { inputs.Add("postgresql", func() telegraf.Input { - return &Postgresql{} + return &Postgresql{ + Service: Service{ + MaxIdle: 1, + MaxOpen: 1, + MaxLifetime: internal.Duration{ + Duration: 0, + }, + }, + } }) } diff --git a/plugins/inputs/postgresql/postgresql_test.go b/plugins/inputs/postgresql/postgresql_test.go index a0690961d0cb3..e627ad4daf6bd 100644 --- a/plugins/inputs/postgresql/postgresql_test.go +++ b/plugins/inputs/postgresql/postgresql_test.go @@ -15,14 +15,18 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), + Service: Service{ + Address: fmt.Sprintf( + "host=%s user=postgres sslmode=disable", + testutil.GetLocalHost(), + ), + }, Databases: []string{"postgres"}, } var acc testutil.Accumulator - err := p.Gather(&acc) - require.NoError(t, err) + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) availableColumns := make(map[string]bool) for _, col := range p.AllColumns { @@ -113,15 +117,19 @@ func TestPostgresqlTagsMetricsWithDatabaseName(t *testing.T) { } p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), + Service: Service{ + Address: fmt.Sprintf( + "host=%s user=postgres sslmode=disable", + testutil.GetLocalHost(), + ), + }, Databases: []string{"postgres"}, } var acc testutil.Accumulator - err := p.Gather(&acc) - require.NoError(t, err) + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) point, ok := acc.Get("postgresql") require.True(t, ok) @@ -135,14 +143,18 @@ func TestPostgresqlDefaultsToAllDatabases(t *testing.T) { } p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), + Service: Service{ + Address: fmt.Sprintf( + "host=%s user=postgres sslmode=disable", + testutil.GetLocalHost(), + ), + }, } var acc testutil.Accumulator - err := p.Gather(&acc) - require.NoError(t, err) + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) var found bool @@ -164,14 +176,17 @@ func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { } p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), + Service: Service{ + Address: fmt.Sprintf( + "host=%s user=postgres sslmode=disable", + testutil.GetLocalHost(), + ), + }, } var acc testutil.Accumulator - - err := p.Gather(&acc) - require.NoError(t, err) + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) for col := range p.IgnoredColumns() { assert.False(t, acc.HasMeasurement(col)) @@ -184,15 +199,19 @@ func TestPostgresqlDatabaseWhitelistTest(t *testing.T) { } p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), + Service: Service{ + Address: fmt.Sprintf( + "host=%s user=postgres sslmode=disable", + testutil.GetLocalHost(), + ), + }, Databases: []string{"template0"}, } var acc testutil.Accumulator - err := p.Gather(&acc) - require.NoError(t, err) + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) var foundTemplate0 = false var foundTemplate1 = false @@ -220,15 +239,18 @@ func TestPostgresqlDatabaseBlacklistTest(t *testing.T) { } p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), + Service: Service{ + Address: fmt.Sprintf( + "host=%s user=postgres sslmode=disable", + testutil.GetLocalHost(), + ), + }, IgnoredDatabases: []string{"template0"}, } var acc testutil.Accumulator - - err := p.Gather(&acc) - require.NoError(t, err) + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) var foundTemplate0 = false var foundTemplate1 = false diff --git a/plugins/inputs/postgresql/service.go b/plugins/inputs/postgresql/service.go new file mode 100644 index 0000000000000..4f7b21e549cf5 --- /dev/null +++ b/plugins/inputs/postgresql/service.go @@ -0,0 +1,142 @@ +package postgresql + +import ( + "database/sql" + "fmt" + "net" + "net/url" + "regexp" + "sort" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" +) + +// pulled from lib/pq +// ParseURL no longer needs to be used by clients of this library since supplying a URL as a +// connection string to sql.Open() is now supported: +// +// sql.Open("postgres", "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full") +// +// It remains exported here for backwards-compatibility. +// +// ParseURL converts a url to a connection string for driver.Open. +// Example: +// +// "postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full" +// +// converts to: +// +// "user=bob password=secret host=1.2.3.4 port=5432 dbname=mydb sslmode=verify-full" +// +// A minimal example: +// +// "postgres://" +// +// This will be blank, causing driver.Open to use all of the defaults +func parseURL(uri string) (string, error) { + u, err := url.Parse(uri) + if err != nil { + return "", err + } + + if u.Scheme != "postgres" && u.Scheme != "postgresql" { + return "", fmt.Errorf("invalid connection protocol: %s", u.Scheme) + } + + var kvs []string + escaper := strings.NewReplacer(` `, `\ `, `'`, `\'`, `\`, `\\`) + accrue := func(k, v string) { + if v != "" { + kvs = append(kvs, k+"="+escaper.Replace(v)) + } + } + + if u.User != nil { + v := u.User.Username() + accrue("user", v) + + v, _ = u.User.Password() + accrue("password", v) + } + + if host, port, err := net.SplitHostPort(u.Host); err != nil { + accrue("host", u.Host) + } else { + accrue("host", host) + accrue("port", port) + } + + if u.Path != "" { + accrue("dbname", u.Path[1:]) + } + + q := u.Query() + for k := range q { + accrue(k, q.Get(k)) + } + + sort.Strings(kvs) // Makes testing easier (not a performance concern) + return strings.Join(kvs, " "), nil +} + +// Service common functionality shared between the postgresql and postgresql_extensible +// packages. +type Service struct { + Address string + Outputaddress string + MaxIdle int + MaxOpen int + MaxLifetime internal.Duration + DB *sql.DB +} + +// Start starts the ServiceInput's service, whatever that may be +func (p *Service) Start(telegraf.Accumulator) (err error) { + const localhost = "host=localhost sslmode=disable" + + if p.Address == "" || p.Address == "localhost" { + p.Address = localhost + } + + if p.DB, err = sql.Open("pgx", p.Address); err != nil { + return err + } + + p.DB.SetMaxOpenConns(p.MaxOpen) + p.DB.SetMaxIdleConns(p.MaxIdle) + p.DB.SetConnMaxLifetime(p.MaxLifetime.Duration) + + return nil +} + +// Stop stops the services and closes any necessary channels and connections +func (p *Service) Stop() { + p.DB.Close() +} + +var kvMatcher, _ = regexp.Compile("(password|sslcert|sslkey|sslmode|sslrootcert)=\\S+ ?") + +// SanitizedAddress utility function to strip sensitive information from the connection string. +func (p *Service) SanitizedAddress() (sanitizedAddress string, err error) { + var ( + canonicalizedAddress string + ) + + if p.Outputaddress != "" { + return p.Outputaddress, nil + } + + if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { + if canonicalizedAddress, err = parseURL(p.Address); err != nil { + return sanitizedAddress, err + } + } else { + canonicalizedAddress = p.Address + } + + sanitizedAddress = kvMatcher.ReplaceAllString(canonicalizedAddress, "") + + return sanitizedAddress, err +} diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible.go b/plugins/inputs/postgresql_extensible/postgresql_extensible.go index b8d3be625c8e5..3ceec1122997a 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible.go @@ -2,29 +2,26 @@ package postgresql_extensible import ( "bytes" - "database/sql" "fmt" "log" - "regexp" "strings" // register in driver. _ "github.com/jackc/pgx/stdlib" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/plugins/inputs" "github.com/influxdata/telegraf/plugins/inputs/postgresql" ) type Postgresql struct { - Address string - Outputaddress string - Databases []string - OrderedColumns []string - AllColumns []string - AdditionalTags []string - sanitizedAddress string - Query []struct { + postgresql.Service + Databases []string + OrderedColumns []string + AllColumns []string + AdditionalTags []string + Query []struct { Sqlquery string Version int Withdbname bool @@ -58,14 +55,20 @@ var sampleConfig = ` ## to grab metrics for. # address = "host=localhost user=postgres sslmode=disable" + + ## connection configuration. + ## maxlifetime - specify the maximum lifetime of a connection. + ## default is forever (0s) + max_lifetime = "0s" + ## A list of databases to pull metrics about. If not specified, metrics for all ## databases are gathered. ## databases = ["app_production", "testing"] # - # outputaddress = "db01" ## A custom name for the database that will be used as the "server" tag in the ## measurement output. If not specified, a default one generated from ## the connection address is used. + # outputaddress = "db01" # ## Define the toml config where the sql queries are stored ## New queries can be added, if the withdbname is set to true and there is no @@ -113,12 +116,9 @@ func (p *Postgresql) IgnoredColumns() map[string]bool { return ignoredColumns } -var localhost = "host=localhost sslmode=disable" - func (p *Postgresql) Gather(acc telegraf.Accumulator) error { var ( err error - db *sql.DB sql_query string query_addon string db_version int @@ -127,20 +127,14 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { meas_name string ) - if p.Address == "" || p.Address == "localhost" { - p.Address = localhost - } - - if db, err = sql.Open("pgx", p.Address); err != nil { + if err = p.DB.Ping(); err != nil { return err } - defer db.Close() // Retreiving the database version query = `select substring(setting from 1 for 3) as version from pg_settings where name='server_version_num'` - err = db.QueryRow(query).Scan(&db_version) - if err != nil { + if err = p.DB.QueryRow(query).Scan(&db_version); err != nil { return err } // We loop in order to process each query @@ -168,7 +162,7 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { sql_query += query_addon if p.Query[i].Version <= db_version { - rows, err := db.Query(sql_query) + rows, err := p.DB.Query(sql_query) if err != nil { return err } @@ -176,14 +170,14 @@ func (p *Postgresql) Gather(acc telegraf.Accumulator) error { defer rows.Close() // grab the column information from the result - p.OrderedColumns, err = rows.Columns() - if err != nil { + if p.OrderedColumns, err = rows.Columns(); err != nil { return err - } else { - for _, v := range p.OrderedColumns { - p.AllColumns = append(p.AllColumns, v) - } } + + for _, v := range p.OrderedColumns { + p.AllColumns = append(p.AllColumns, v) + } + p.AdditionalTags = nil if tag_value != "" { tag_list := strings.Split(tag_value, ",") @@ -207,26 +201,6 @@ type scanner interface { Scan(dest ...interface{}) error } -var KVMatcher, _ = regexp.Compile("(password|sslcert|sslkey|sslmode|sslrootcert)=\\S+ ?") - -func (p *Postgresql) SanitizedAddress() (_ string, err error) { - if p.Outputaddress != "" { - return p.Outputaddress, nil - } - var canonicalizedAddress string - if strings.HasPrefix(p.Address, "postgres://") || strings.HasPrefix(p.Address, "postgresql://") { - canonicalizedAddress, err = postgresql.ParseURL(p.Address) - if err != nil { - return p.sanitizedAddress, err - } - } else { - canonicalizedAddress = p.Address - } - p.sanitizedAddress = KVMatcher.ReplaceAllString(canonicalizedAddress, "") - - return p.sanitizedAddress, err -} - func (p *Postgresql) accRow(meas_name string, row scanner, acc telegraf.Accumulator) error { var columnVars []interface{} var dbname bytes.Buffer @@ -304,6 +278,14 @@ COLUMN: func init() { inputs.Add("postgresql_extensible", func() telegraf.Input { - return &Postgresql{} + return &Postgresql{ + Service: postgresql.Service{ + MaxIdle: 1, + MaxOpen: 1, + MaxLifetime: internal.Duration{ + Duration: 0, + }, + }, + } }) } diff --git a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go index f92284ee43929..1ebd0cf0c4b43 100644 --- a/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go +++ b/plugins/inputs/postgresql_extensible/postgresql_extensible_test.go @@ -4,6 +4,7 @@ import ( "fmt" "testing" + "github.com/influxdata/telegraf/plugins/inputs/postgresql" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -15,8 +16,12 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { } p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), + Service: postgresql.Service{ + Address: fmt.Sprintf( + "host=%s user=postgres sslmode=disable", + testutil.GetLocalHost(), + ), + }, Databases: []string{"postgres"}, Query: query{ {Sqlquery: "select * from pg_stat_database", @@ -26,8 +31,8 @@ func TestPostgresqlGeneratesMetrics(t *testing.T) { }, } var acc testutil.Accumulator - err := p.Gather(&acc) - require.NoError(t, err) + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) availableColumns := make(map[string]bool) for _, col := range p.AllColumns { @@ -108,14 +113,17 @@ func TestPostgresqlIgnoresUnwantedColumns(t *testing.T) { } p := &Postgresql{ - Address: fmt.Sprintf("host=%s user=postgres sslmode=disable", - testutil.GetLocalHost()), + Service: postgresql.Service{ + Address: fmt.Sprintf( + "host=%s user=postgres sslmode=disable", + testutil.GetLocalHost(), + ), + }, } var acc testutil.Accumulator - - err := p.Gather(&acc) - require.NoError(t, err) + require.NoError(t, p.Start(&acc)) + require.NoError(t, p.Gather(&acc)) for col := range p.IgnoredColumns() { assert.False(t, acc.HasMeasurement(col))