Skip to content

Commit

Permalink
Keep a pool of connections in postgresql module (#12603)
Browse files Browse the repository at this point in the history
Reuse connections in PostgreSQL metricsets instead of opening and
closing one on each fetch.

Fixes #12504
  • Loading branch information
jsoriano authored Jun 20, 2019
1 parent fd4058b commit 400c67e
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 93 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix an issue listing all processes when run under Windows as a non-privileged user. {issue}12301[12301] {pull}12475[12475]
- The `elasticsearch/index_summary` metricset gracefully handles an empty Elasticsearch cluster when `xpack.enabled: true` is set. {pull}12489[12489] {issue}12487[12487]
- When TLS is configured for the http metricset and a `certificate_authorities` is configured we now default to `required` for the `client_authentication`. {pull}12584[12584]
- Reuse connections in PostgreSQL metricsets. {issue}12504[12504] {pull}12603[12603]

*Packetbeat*

Expand Down
22 changes: 9 additions & 13 deletions metricbeat/module/postgresql/activity/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
package activity

import (
"database/sql"
"context"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

// init registers the MetricSet with the central registry.
Expand All @@ -40,27 +37,26 @@ func init() {

// MetricSet type defines all fields of the Postgresql MetricSet
type MetricSet struct {
mb.BaseMetricSet
*postgresql.MetricSet
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
ms, err := postgresql.NewMetricSet(base)
if err != nil {
return nil, err
}
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_activity")
ctx := context.Background()
results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_activity")
if err != nil {
return errors.Wrap(err, "error in QueryStats")
}
Expand Down
22 changes: 9 additions & 13 deletions metricbeat/module/postgresql/bgwriter/bgwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@
package bgwriter

import (
"database/sql"
"context"
"fmt"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

// init registers the MetricSet with the central registry.
Expand All @@ -41,25 +38,24 @@ func init() {

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*postgresql.MetricSet
}

// New create a new instance of the MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
ms, err := postgresql.NewMetricSet(base)
if err != nil {
return nil, err
}
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_bgwriter")
ctx := context.Background()
results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_bgwriter")
if err != nil {
return errors.Wrap(err, "error in QueryStats")
}
Expand Down
19 changes: 9 additions & 10 deletions metricbeat/module/postgresql/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package database

import (
"database/sql"
"context"

"github.com/pkg/errors"

Expand All @@ -40,25 +40,24 @@ func init() {

// MetricSet type defines all fields of the MetricSet
type MetricSet struct {
mb.BaseMetricSet
*postgresql.MetricSet
}

// New create a new instance of the postgresql database MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
ms, err := postgresql.NewMetricSet(base)
if err != nil {
return nil, err
}
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_database")
ctx := context.Background()
results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_database")
if err != nil {
return errors.Wrap(err, "error in QueryStats")
}
Expand Down
104 changes: 104 additions & 0 deletions metricbeat/module/postgresql/metricset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package postgresql

import (
"context"
"database/sql"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

type MetricSet struct {
mb.BaseMetricSet

db *sql.DB
}

// NewMetricSet creates a PostgreSQL metricset with a pool of connections
func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
return &MetricSet{BaseMetricSet: base}, nil
}

// DB creates a database connection, it must be freed after use with `Close()`
func (ms *MetricSet) DB(ctx context.Context) (*sql.Conn, error) {
if ms.db == nil {
db, err := sql.Open("postgres", ms.HostData().URI)
if err != nil {
return nil, errors.Wrap(err, "failed to open connection")
}
ms.db = db
}
return ms.db.Conn(ctx)
}

//QueryStats makes the database call for a given metric
func (ms *MetricSet) QueryStats(ctx context.Context, query string) ([]map[string]interface{}, error) {
db, err := ms.DB(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to obtain a connection with the database")
}
defer db.Close()

rows, err := db.QueryContext(ctx, query)
if err != nil {
return nil, errors.Wrap(err, "failed to query database")
}

columns, err := rows.Columns()
if err != nil {
return nil, errors.Wrap(err, "scanning columns")
}
vals := make([][]byte, len(columns))
valPointers := make([]interface{}, len(columns))
for i := range vals {
valPointers[i] = &vals[i]
}

results := []map[string]interface{}{}

for rows.Next() {
err = rows.Scan(valPointers...)
if err != nil {
return nil, errors.Wrap(err, "scanning row")
}

result := map[string]interface{}{}
for i, col := range columns {
result[col] = string(vals[i])
}

logp.Debug("postgresql", "Result: %v", result)
results = append(results, result)
}
return results, nil
}

// Close closes the metricset and its connections
func (ms *MetricSet) Close() error {
if ms.db == nil {
return nil
}
return errors.Wrap(ms.db.Close(), "failed to close connection")
}
39 changes: 0 additions & 39 deletions metricbeat/module/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ Package postgresql is Metricbeat module for PostgreSQL server.
package postgresql

import (
"database/sql"
"fmt"
"net/url"
"strconv"
"strings"

"github.com/lib/pq"
"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
)
Expand Down Expand Up @@ -103,39 +100,3 @@ func ParseURL(mod mb.Module, rawURL string) (mb.HostData, error) {

return h, nil
}

//QueryStats makes the database call for a given metric
func QueryStats(db *sql.DB, query string) ([]map[string]interface{}, error) {
rows, err := db.Query(query)
if err != nil {
return nil, err
}

columns, err := rows.Columns()
if err != nil {
return nil, errors.Wrap(err, "scanning columns")
}
vals := make([][]byte, len(columns))
valPointers := make([]interface{}, len(columns))
for i := range vals {
valPointers[i] = &vals[i]
}

results := []map[string]interface{}{}

for rows.Next() {
err = rows.Scan(valPointers...)
if err != nil {
return nil, errors.Wrap(err, "scanning row")
}

result := map[string]interface{}{}
for i, col := range columns {
result[col] = string(vals[i])
}

logp.Debug("postgresql", "Result: %v", result)
results = append(results, result)
}
return results, nil
}
25 changes: 7 additions & 18 deletions metricbeat/module/postgresql/statement/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@
package statement

import (
"database/sql"
"context"

"github.com/pkg/errors"

"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/postgresql"

// Register postgresql database/sql driver
_ "github.com/lib/pq"
)

// init registers the MetricSet with the central registry as soon as the program
Expand All @@ -44,33 +41,25 @@ func init() {
// mb.BaseMetricSet because it implements all of the required mb.MetricSet
// interface methods except for Fetch.
type MetricSet struct {
mb.BaseMetricSet
*postgresql.MetricSet
}

// New creates a new instance of the MetricSet. New is responsible for unpacking
// any MetricSet specific configuration options if there are any.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct{}{}
if err := base.Module().UnpackConfig(&config); err != nil {
ms, err := postgresql.NewMetricSet(base)
if err != nil {
return nil, err
}

return &MetricSet{
BaseMetricSet: base,
}, nil
return &MetricSet{MetricSet: ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
db, err := sql.Open("postgres", m.HostData().URI)
if err != nil {
return errors.Wrap(err, "error in Open")
}
defer db.Close()

results, err := postgresql.QueryStats(db, "SELECT * FROM pg_stat_statements")
ctx := context.Background()
results, err := m.QueryStats(ctx, "SELECT * FROM pg_stat_statements")
if err != nil {
return errors.Wrap(err, "QueryStats")
}
Expand Down

0 comments on commit 400c67e

Please sign in to comment.