Skip to content

Commit

Permalink
Merge pull request #1 from scottcrespo/metricbeat-mongodb-persistent-…
Browse files Browse the repository at this point in the history
…connections

Metricbeat mongodb persistent connections
  • Loading branch information
Scott Crespo authored Dec 29, 2016
2 parents 590b9e2 + aafaafe commit 663b4da
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 60 deletions.
23 changes: 22 additions & 1 deletion metricbeat/module/mongodb/dbstats/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
dbstats provides an overview of a particular mongo database. This document
is most concerned with data volumes of a database.
fields:
- name: avg_object_size
- name: avg_obj_size
type: long

- name: collections
Expand Down Expand Up @@ -33,3 +33,24 @@

- name: storage_size
type: long

- name: ns_size_mb
type: long

- name: data_file_version
type: group
fields:
- name: major
type: long

- name: minor
type: long

- name: extent_free_list
type: group
fields:
- name: num
type: long

- name: size
type: long
33 changes: 23 additions & 10 deletions metricbeat/module/mongodb/dbstats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,29 @@ import (
)

var schema = s.Schema{
"db": c.Str("db"),
"collection": c.Int("collections"),
"objects": c.Int("objects"),
"avg_object_size": c.Int("avgObjectSize"),
"data_size": c.Int("dataSize"),
"storage_size": c.Int("storageSize"),
"num_extents": c.Int("numExtents"),
"indexes": c.Int("indexes"),
"index_size": c.Int("indexSize"),
"file_size": c.Int("fileSize"),
"db": c.Str("db"),
"collections": c.Int("collections"),
"objects": c.Int("objects"),
"avg_obj_size": c.Int("avgObjSize"),
"data_size": c.Int("dataSize"),
"storage_size": c.Int("storageSize"),
"num_extents": c.Int("numExtents"),
"indexes": c.Int("indexes"),
"index_size": c.Int("indexSize"),
// mmapv1 only
"ns_size_mb": c.Int("nsSizeMB"),
// mmapv1 only
"file_size": c.Int("fileSize"),
// mmapv1 only
"data_file_version": c.Dict("dataFileVersion", s.Schema{
"major": c.Int("major"),
"minor": c.Int("minor"),
}),
// mmapv1 only
"extent_free_list": c.Dict("extentFreeList", s.Schema{
"num": c.Int("num"),
"size": c.Int("size"),
}),
}

var eventMapping = schema.Apply
77 changes: 57 additions & 20 deletions metricbeat/module/mongodb/dbstats/dbstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package dbstats

import (
"errors"
"fmt"
"sync"

"github.com/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/mongodb"
"gopkg.in/mgo.v2"
)

// init registers the MetricSet with the central registry.
// The New method will be called after the setup of the module and before starting to fetch data
func init() {
if err := mb.Registry.AddMetricSet("mongodb", "dbstats", New); err != nil {
if err := mb.Registry.AddMetricSet("mongodb", "dbstats", New, mongodb.ParseURL); err != nil {
panic(err)
}
}
Expand All @@ -23,10 +26,11 @@ func init() {
// multiple fetch calls.
type MetricSet struct {
mb.BaseMetricSet
dialInfo *mgo.DialInfo
dialInfo *mgo.DialInfo
mongoSessions []*mgo.Session
}

// New create a new instance of the MetricSet
// New creates a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
Expand All @@ -36,9 +40,16 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}
dialInfo.Timeout = base.Module().Config().Timeout

// instantiate direct connections to each of the configured Mongo hosts
mongoSessions, err := mongodb.NewDirectSessions(dialInfo.Addrs, dialInfo)
if err != nil {
return nil, err
}

return &MetricSet{
BaseMetricSet: base,
dialInfo: dialInfo,
mongoSessions: mongoSessions,
}, nil
}

Expand All @@ -47,20 +58,52 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// descriptive error must be returned.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {

// establish connection to mongo
session, err := mgo.DialWithInfo(m.dialInfo)
if err != nil {
return nil, err
// create a wait group because we're going to spawn a goroutine for each host target
var wg sync.WaitGroup
wg.Add(len(m.mongoSessions))

// events is the value returned by this function
var events []common.MapStr

// created buffered channel to receive async results from each of the nodes
channel := make(chan []common.MapStr, len(m.mongoSessions))

for _, mongo := range m.mongoSessions {
go func(mongo *mgo.Session) {
defer wg.Done()
channel <- m.fetchNodeDbStats(mongo)
}(mongo)
}

// wait for goroutines to complete
wg.Wait()
close(channel)

// pull results off of the channel and append to events
for data := range channel {
events = append(events, data...)
}

// if we didn't get results from any node, return an error
if len(events) == 0 {
err := errors.New("Failed to retrieve db stats from all nodes")
return []common.MapStr{}, err
}
defer session.Close()

session.SetMode(mgo.Monotonic, true)
fmt.Printf("%v", events)

return events, nil
}

// fetchNodeDbStats implements the logic to fetch dbstats() for all databases on a particular
// mongo node.
func (m *MetricSet) fetchNodeDbStats(session *mgo.Session) []common.MapStr {

// Get the list of databases database names, which we'll use to call db.stats() on each
// Get the list of databases names, which we'll use to call db.stats() on each
dbNames, err := session.DatabaseNames()
if err != nil {
logp.Err("Error retrieving database names from Mongo instance")
return []common.MapStr{}, err
return []common.MapStr{}
}

// events is the list of events collected from each of the databases.
Expand All @@ -70,7 +113,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
for _, dbName := range dbNames {
db := session.DB(dbName)

result := map[string]interface{}{}
result := common.MapStr{}

err := db.Run("dbStats", &result)
if err != nil {
Expand All @@ -80,11 +123,5 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
events = append(events, eventMapping(result))
}

// if we failed to collect on any databases, return an error
if len(events) == 0 {
err = errors.New("Failed to fetch stats for all databases in mongo instance")
return []common.MapStr{}, err
}

return events, nil
return events
}
24 changes: 21 additions & 3 deletions metricbeat/module/mongodb/dbstats/dbstats_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package dbstats
import (
"testing"

"github.com/beats/metricbeat/module/mongodb"
mbtest "github.com/elastic/beats/metricbeat/mb/testing"
"github.com/elastic/beats/metricbeat/module/mongodb"
"github.com/stretchr/testify/assert"
)

Expand All @@ -22,11 +22,29 @@ func TestFetch(t *testing.T) {
db := event["db"].(string)
assert.NotEqual(t, db, "")

collections := event["collections"].(int64)
assert.True(t, collections > 0)

objects := event["objects"].(int64)
assert.True(t, objects > 0)

avgObjSize := event["avg_obj_size"].(int64)
assert.True(t, avgObjSize > 0)

dataSize := event["data_size"].(int64)
assert.True(t, dataSize > 0)

collections := event["collections"].(int64)
assert.True(t, collections > 0)
storageSize := event["storage_size"].(int64)
assert.True(t, storageSize > 0)

numExtents := event["num_extents"].(int64)
assert.True(t, numExtents >= 0)

indexes := event["indexes"].(int64)
assert.True(t, indexes >= 0)

indexSize := event["index_size"].(int64)
assert.True(t, indexSize > 0)
}
}

Expand Down
47 changes: 47 additions & 0 deletions metricbeat/module/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package mongodb

import (
"errors"
"fmt"
"net/url"
"strings"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"

Expand Down Expand Up @@ -61,3 +63,48 @@ func ParseURL(module mb.Module, host string) (mb.HostData, error) {

return parse.NewHostDataFromURL(u), nil
}

// NewSession returns a connection to MongoDB (*mgo.Session) by dialing the mongo
// instance specified in settings. If a connection cannot be established, a Critical error is
// thrown and the program exits
func NewSession(dialInfo *mgo.DialInfo) *mgo.Session {
mongo, err := mgo.DialWithInfo(dialInfo)
if err != nil {
logp.Critical("Failed to establish connection to MongDB at %s", dialInfo.Addrs)
}
return mongo
}

// NewDirectSessions estbalishes direct connections with a list of hosts. It uses the supplied
// dialInfo parameter as a template for establishing more direct connections
func NewDirectSessions(urls []string, dialInfo *mgo.DialInfo) ([]*mgo.Session, error) {

var nodes []*mgo.Session

for _, url := range urls {

// make a copy
nodeDialInfo := *dialInfo
nodeDialInfo.Addrs = []string{
url,
}
nodeDialInfo.Direct = true
nodeDialInfo.FailFast = true

session, err := mgo.DialWithInfo(&nodeDialInfo)
if err != nil {
logp.Err("Error establishing direct connection to mongo node at %s. Error output: %s", url, err.Error())
// set i back a value so we don't skip an index when adding successful connections
continue
}
nodes = append(nodes, session)
}

if len(nodes) == 0 {
msg := "Error establishing connection to any mongo nodes"
logp.Err(msg)
return []*mgo.Session{}, errors.New(msg)
}

return nodes, nil
}
Loading

0 comments on commit 663b4da

Please sign in to comment.