Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor mongodb module to align with other modules #1

Open
wants to merge 1 commit into
base: mongodb_multihost_32188
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 11 additions & 17 deletions metricbeat/module/mongodb/collstats/collstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,33 @@ import (
)

func init() {
mb.Registry.MustAddMetricSet("mongodb", "collstats", New,
mb.WithHostParser(mongodb.ParseURL),
mb.DefaultMetricSet(),
)
mb.Registry.MustAddMetricSet("mongodb", "collstats", New, mb.DefaultMetricSet())
}

// Metricset type defines all fields of the Metricset
// MetricSet type defines all fields of the MetricSet
// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with
// additional entries. These variables can be used to persist data or configuration between
// multiple fetch calls.
type Metricset struct {
*mongodb.Metricset
type MetricSet struct {
*mongodb.MetricSet
}

// New creates a new instance of the Metricset
// New creates a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
ms, err := mongodb.NewMetricset(base)
ms, err := mongodb.NewMetricSet(base)
if err != nil {
return nil, fmt.Errorf("could not create mongodb metricset: %w", err)
return nil, err
}

return &Metricset{ms}, nil
return &MetricSet{ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *Metricset) Fetch(reporter mb.ReporterV2) error {
client, err := mongodb.NewClient(m.Metricset.Config, m.Module().Config().Timeout, 0)
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
client, err := mongodb.NewClient(m.ClientOptions)
if err != nil {
return fmt.Errorf("could not create mongodb client: %w", err)
}
Expand Down Expand Up @@ -112,9 +108,7 @@ func (m *Metricset) Fetch(reporter mb.ReporterV2) error {
continue
}

reporter.Event(mb.Event{
MetricSetFields: event,
})
reporter.Event(mb.Event{MetricSetFields: event})
}

return nil
Expand Down
11 changes: 4 additions & 7 deletions metricbeat/module/mongodb/dbstats/dbstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,22 @@ import (
// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
mb.Registry.MustAddMetricSet("mongodb", "dbstats", New,
mb.WithHostParser(mongodb.ParseURL),
mb.DefaultMetricSet(),
)
mb.Registry.MustAddMetricSet("mongodb", "dbstats", New, mb.DefaultMetricSet())
}

// MetricSet type defines all fields of the MetricSet
// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with
// additional entries. These variables can be used to persist data or configuration between
// multiple fetch calls.
type MetricSet struct {
*mongodb.Metricset
*mongodb.MetricSet
}

// New creates a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
ms, err := mongodb.NewMetricset(base)
ms, err := mongodb.NewMetricSet(base)
if err != nil {
return nil, err
}
Expand All @@ -62,7 +59,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
client, err := mongodb.NewClient(m.Metricset.Config, m.Module().Config().Timeout, 0)
client, err := mongodb.NewClient(m.ClientOptions)
if err != nil {
return fmt.Errorf("could not create mongodb client: %w", err)
}
Expand Down
11 changes: 4 additions & 7 deletions metricbeat/module/mongodb/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,22 @@ import (
)

func init() {
mb.Registry.MustAddMetricSet("mongodb", "metrics", New,
mb.WithHostParser(mongodb.ParseURL),
mb.DefaultMetricSet(),
)
mb.Registry.MustAddMetricSet("mongodb", "metrics", New, mb.DefaultMetricSet())
}

// MetricSet type defines all fields of the MetricSet
// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with
// additional entries. These variables can be used to persist data or configuration between
// multiple fetch calls.
type MetricSet struct {
*mongodb.Metricset
*mongodb.MetricSet
}

// New creates a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
ms, err := mongodb.NewMetricset(base)
ms, err := mongodb.NewMetricSet(base)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +55,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
client, err := mongodb.NewClient(m.Metricset.Config, m.Module().Config().Timeout, 0)
client, err := mongodb.NewClient(m.ClientOptions)
if err != nil {
return fmt.Errorf("could not create mongodb client: %w", err)
}
Expand Down
111 changes: 36 additions & 75 deletions metricbeat/module/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,13 @@ package mongodb
import (
"context"
"fmt"
"net/url"
"strings"
"time"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/mb/parse"

"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)
Expand Down Expand Up @@ -59,9 +56,10 @@ type ModuleConfig struct {
} `config:"credentials"`
}

type Metricset struct {
type MetricSet struct {
mb.BaseMetricSet
Config ModuleConfig
Config ModuleConfig
ClientOptions *options.ClientOptions
}

type module struct {
Expand All @@ -74,72 +72,17 @@ func NewModule(base mb.BaseModule) (mb.Module, error) {
return &module{base}, nil
}

func NewMetricset(base mb.BaseMetricSet) (*Metricset, error) {
func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
// Validate that at least one host has been specified.
config := ModuleConfig{}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, fmt.Errorf("could not read config: %w", err)
}

return &Metricset{Config: config, BaseMetricSet: base}, nil
}

// ParseURL parses valid MongoDB URL strings into an mb.HostData instance
func ParseURL(module mb.Module, host string) (mb.HostData, error) {
c := struct {
Username string `config:"username"`
Password string `config:"password"`
Credentials struct {
AuthMechanism string `config:"auth_mechanism"`
AuthMechanismProperties map[string]string `config:"auth_mechanism_properties"`
AuthSource string `config:"auth_source"`
PasswordSet bool `config:"password_set"`
} `config:"credentials"`
}{}
if err := module.UnpackConfig(&c); err != nil {
return mb.HostData{}, err
}

if parts := strings.SplitN(host, "://", 2); len(parts) != 2 {
// Add scheme.
host = fmt.Sprintf("mongodb://%s", host)
}

// This doesn't use URLHostParserBuilder because MongoDB URLs can contain
// multiple hosts separated by commas (mongodb://host1,host2,host3?options).
u, err := url.Parse(host)
if err != nil {
return mb.HostData{}, fmt.Errorf("error parsing URL: %w", err)
}

parse.SetURLUser(u, c.Username, c.Password)

clientOptions := options.Client()
clientOptions.Auth = &options.Credential{
AuthMechanism: c.Credentials.AuthMechanism,
AuthMechanismProperties: c.Credentials.AuthMechanismProperties,
AuthSource: c.Credentials.AuthSource,
PasswordSet: c.Credentials.PasswordSet,
Username: c.Username,
Password: c.Password,
}

clientOptions.ApplyURI(host)

// https://docs.mongodb.com/manual/reference/connection-string/
_, err = url.Parse(clientOptions.GetURI())
if err != nil {
return mb.HostData{}, fmt.Errorf("error parsing URL: %w", err)
}

return parse.NewHostDataFromURL(u), nil
}

func NewClient(config ModuleConfig, timeout time.Duration, mode readpref.Mode) (*mongo.Client, error) {
clientOptions := options.Client()

// options.Credentials must be nil for the driver to work properly if no auth is provided. Zero values breaks
// the connnection
// options.Credentials must be nil for the driver to work properly if no auth is provided.
// Zero values breaks the connnection.
if config.Username != "" && config.Password != "" {
clientOptions.Auth = &options.Credential{
AuthMechanism: config.Credentials.AuthMechanism,
Expand All @@ -155,33 +98,51 @@ func NewClient(config ModuleConfig, timeout time.Duration, mode readpref.Mode) (
}
}

if mode == 0 {
mode = readpref.NearestMode
if config.TLS.IsEnabled() {
tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return nil, fmt.Errorf("could not load provided TLS configuration: %w", err)
}

clientOptions.SetTLSConfig(tlsConfig.ToConfig())
}

host := base.Host()
if parts := strings.SplitN(host, "://", 2); len(parts) != 2 {
host = "mongodb://" + host // add scheme
}
if err := clientOptions.ApplyURI(host).Validate(); err != nil {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApplyURI is purposedly put here because as per the documentation, any methods on clientOptions i.e, of SetXXX form overrides the URI parameters (if any) and vice-versa. So it would give the user freedom to set as desired parameters using the URI itself and it overrides the values of the fields that were set using SetXXX

return nil, fmt.Errorf("URL validation error: %w", err)
}

readPreference, err := readpref.New(mode)
readPreference, err := readpref.New(readpref.PrimaryMode)
if err != nil {
return nil, err
}

clientOptions.SetReadPreference(readPreference)
clientOptions.SetConnectTimeout(timeout)

if config.TLS.IsEnabled() {
tlsConfig, err := tlscommon.LoadTLSConfig(config.TLS)
if err != nil {
return nil, fmt.Errorf("could not load provided TLS configuration: %w", err)
}
// TODO(SS): Decide a better timeout? Currently, mb.DefaultModuleConfig().Timeout
// will make the client use no timeout. If we do not call this method, then by default,
// it is set to 30s.
clientOptions.SetConnectTimeout(mb.DefaultModuleConfig().Timeout)

clientOptions.SetTLSConfig(tlsConfig.ToConfig())
if err := clientOptions.Validate(); err != nil {
return nil, fmt.Errorf("client options validation failed: %w", err)
}

client, err := mongo.NewClient(clientOptions)
return &MetricSet{Config: config, ClientOptions: clientOptions, BaseMetricSet: base}, nil
}

func NewClient(opts *options.ClientOptions) (*mongo.Client, error) {
client, err := mongo.NewClient(opts)
if err != nil {
return nil, fmt.Errorf("could not create mongodb client: %w", err)
}

if err = client.Connect(context.Background()); err != nil {
return client, fmt.Errorf("could not connect to mongodb: %w", err)
return nil, fmt.Errorf("could not connect to mongodb: %w", err)
}

return client, nil
}
Loading