From b6f8da358c3833557f755202a006c29ad01b516c Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 22 Dec 2016 13:50:32 -0500 Subject: [PATCH 01/11] new implementation that establishes direct connetions to mongo hosts --- metricbeat/module/mongodb/dbstats/dbstats.go | 59 +++++++++++++++----- metricbeat/module/mongodb/mongodb.go | 46 +++++++++++++++ metricbeat/module/mongodb/status/status.go | 28 ++++++---- 3 files changed, 110 insertions(+), 23 deletions(-) diff --git a/metricbeat/module/mongodb/dbstats/dbstats.go b/metricbeat/module/mongodb/dbstats/dbstats.go index 8cdadaae030..c417fdb6e44 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats.go +++ b/metricbeat/module/mongodb/dbstats/dbstats.go @@ -2,10 +2,13 @@ package dbstats import ( "errors" + "fmt" + "sync" "github.com/beats/libbeat/logp" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" + "github.com/scottcrespo/beats/metricbeat/module/mongodb" "gopkg.in/mgo.v2" ) @@ -23,10 +26,11 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - dialInfo *mgo.DialInfo + dialInfo *mgo.DialInfo + mongoSessions []*mgo.Session } -// New create a new instance of the MetricSet +// New creates a new instance of the MetricSet // Part of new is also setting up the configuration by processing additional // configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { @@ -36,9 +40,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } dialInfo.Timeout = base.Module().Config().Timeout + mongoSessions, err := mongodb.NewDirectSessions(dialInfo.Addrs, dialInfo) + if err != nil { + return nil, err + } + return &MetricSet{ BaseMetricSet: base, dialInfo: dialInfo, + mongoSessions: mongoSessions, }, nil } @@ -46,21 +56,44 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { + var wg sync.WaitGroup + wg.Add(len(m.mongoSessions)) - // establish connection to mongo - session, err := mgo.DialWithInfo(m.dialInfo) - if err != nil { - return nil, err + var events []common.MapStr + + channel := make(chan interface{}, len(m.mongoSessions)) + + // fetch stats from each individual mongo node + for _, mongo := range m.mongoSessions { + go func(mongo *mgo.Session, wg *sync.WaitGroup) { + defer wg.Done() + channel <- m.fetchNodeDbStats(mongo) + }(mongo, &wg) + } + // wait for goroutines to complete + wg.Wait() + // pull results off of the channel and append to events + for data := range channel { + events = append(events, data.([]common.MapStr)...) } - defer session.Close() - session.SetMode(mgo.Monotonic, true) + // if we didn't get results from any node, return an error + if len(events) == 0 { + err := errors.New("Failed to retrieve db stats from all nodes") + return []common.MapStr{}, err + } + + fmt.Printf("%v", events) - // Get the list of databases database names, which we'll use to call db.stats() on each + return events, nil +} + +func (m *MetricSet) fetchNodeDbStats(session *mgo.Session) []common.MapStr { + // Get the list of databases names, which we'll use to call db.stats() on each dbNames, err := session.DatabaseNames() if err != nil { logp.Err("Error retrieving database names from Mongo instance") - return []common.MapStr{}, err + return []common.MapStr{} } // events is the list of events collected from each of the databases. @@ -70,7 +103,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { for _, dbName := range dbNames { db := session.DB(dbName) - result := map[string]interface{}{} + result := common.MapStr{} err := db.Run("dbStats", &result) if err != nil { @@ -83,8 +116,8 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { // if we failed to collect on any databases, return an error if len(events) == 0 { err = errors.New("Failed to fetch stats for all databases in mongo instance") - return []common.MapStr{}, err + return []common.MapStr{} } - return events, nil + return events } diff --git a/metricbeat/module/mongodb/mongodb.go b/metricbeat/module/mongodb/mongodb.go index 6cf8332601c..9bcad3fe4a5 100644 --- a/metricbeat/module/mongodb/mongodb.go +++ b/metricbeat/module/mongodb/mongodb.go @@ -1,10 +1,12 @@ package mongodb import ( + "errors" "fmt" "net/url" "strings" + "github.com/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" @@ -61,3 +63,47 @@ func ParseURL(module mb.Module, host string) (mb.HostData, error) { return parse.NewHostDataFromURL(u), nil } + +// NewSession returns a connection to MongoDB (*mgo.Session) by dialing the mongo +// instance specified in settings. If a connection cannot be established, a Critical error is +// thrown and the program exits +func NewSession(dialInfo *mgo.DialInfo) *mgo.Session { + mongo, err := mgo.DialWithInfo(dialInfo) + if err != nil { + logp.Critical("Failed to establish connection to MongDB at %s", dialInfo.Addrs) + } + return mongo +} + +// NewDirectSessions estbalishes direct connections with a list of hosts. It uses the supplied +// dialInfo parameter as a template for establishing more direct connections +func NewDirectSessions(urls []string, dialInfo *mgo.DialInfo) ([]*mgo.Session, error) { + + var nodes []*mgo.Session + + for _, url := range urls { + + // make a copy + nodeDialInfo := *dialInfo + nodeDialInfo.Addrs = []string{ + url, + } + nodeDialInfo.Direct = true + + session, err := mgo.DialWithInfo(&nodeDialInfo) + if err != nil { + logp.Err("Error establishing direct connection to mongo node at %s. Error output: %s", url, err.Error()) + // set i back a value so we don't skip an index when adding successful connections + continue + } + nodes = append(nodes, session) + } + + if len(nodes) == 0 { + msg := "Error establishing connection to any mongo nodes" + logp.Err(msg) + return []*mgo.Session{}, errors.New(msg) + } + + return nodes, nil +} diff --git a/metricbeat/module/mongodb/status/status.go b/metricbeat/module/mongodb/status/status.go index 35335a6f6b3..eb31c510cce 100644 --- a/metricbeat/module/mongodb/status/status.go +++ b/metricbeat/module/mongodb/status/status.go @@ -4,7 +4,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" - "github.com/elastic/beats/metricbeat/module/mongodb" + "github.com/scottcrespo/beats/metricbeat/module/mongodb" "github.com/pkg/errors" "gopkg.in/mgo.v2" @@ -25,11 +25,19 @@ func init() { } } +// MetricSet type defines all fields of the MetricSet +// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with +// additional entries. These variables can be used to persist data or configuration between +// multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - dialInfo *mgo.DialInfo + dialInfo *mgo.DialInfo + mongoSession *mgo.Session } +// New creates a new instance of the MetricSet +// Part of new is also setting up the configuration by processing additional +// configuration entries if needed. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { dialInfo, err := mgo.ParseURL(base.HostData().URI) if err != nil { @@ -37,22 +45,22 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } dialInfo.Timeout = base.Module().Config().Timeout + mongoSession := mongodb.NewMongoSession(dialInfo) + mongoSession.SetMode(mgo.Monotonic, true) + return &MetricSet{ BaseMetricSet: base, dialInfo: dialInfo, + mongoSession: mongoSession, }, nil } +// Fetch methods implements the data gathering and data conversion to the right format +// It returns the event which is then forward to the output. In case of an error, a +// descriptive error must be returned. func (m *MetricSet) Fetch() (common.MapStr, error) { - session, err := mgo.DialWithInfo(m.dialInfo) - if err != nil { - return nil, err - } - defer session.Close() - - session.SetMode(mgo.Monotonic, true) result := map[string]interface{}{} - if err := session.DB("admin").Run(bson.D{{"serverStatus", 1}}, &result); err != nil { + if err := m.mongoSession.DB("admin").Run(bson.D{{"serverStatus", 1}}, &result); err != nil { return nil, errors.Wrap(err, "mongodb fetch failed") } From 74f9ee03cf0df76624901dd8e92d96dd6f180b75 Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Wed, 28 Dec 2016 12:02:53 -0500 Subject: [PATCH 02/11] successful reporting using direct node connections --- metricbeat/module/mongodb/dbstats/dbstats.go | 8 +++++--- .../module/mongodb/dbstats/dbstats_integration_test.go | 2 +- metricbeat/module/mongodb/mongodb.go | 2 +- metricbeat/module/mongodb/status/status.go | 4 ++-- 4 files changed, 9 insertions(+), 7 deletions(-) diff --git a/metricbeat/module/mongodb/dbstats/dbstats.go b/metricbeat/module/mongodb/dbstats/dbstats.go index c417fdb6e44..dc06d4df6f2 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats.go +++ b/metricbeat/module/mongodb/dbstats/dbstats.go @@ -5,10 +5,10 @@ import ( "fmt" "sync" - "github.com/beats/libbeat/logp" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" - "github.com/scottcrespo/beats/metricbeat/module/mongodb" + "github.com/elastic/beats/metricbeat/module/mongodb" "gopkg.in/mgo.v2" ) @@ -72,12 +72,14 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { } // wait for goroutines to complete wg.Wait() + close(channel) + // pull results off of the channel and append to events for data := range channel { events = append(events, data.([]common.MapStr)...) } - // if we didn't get results from any node, return an error + //if we didn't get results from any node, return an error if len(events) == 0 { err := errors.New("Failed to retrieve db stats from all nodes") return []common.MapStr{}, err diff --git a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go index 6a685aa4485..b509fb3899d 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go +++ b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go @@ -3,8 +3,8 @@ package dbstats import ( "testing" - "github.com/beats/metricbeat/module/mongodb" mbtest "github.com/elastic/beats/metricbeat/mb/testing" + "github.com/elastic/beats/metricbeat/module/mongodb" "github.com/stretchr/testify/assert" ) diff --git a/metricbeat/module/mongodb/mongodb.go b/metricbeat/module/mongodb/mongodb.go index 9bcad3fe4a5..3dd52ff1238 100644 --- a/metricbeat/module/mongodb/mongodb.go +++ b/metricbeat/module/mongodb/mongodb.go @@ -6,7 +6,7 @@ import ( "net/url" "strings" - "github.com/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" diff --git a/metricbeat/module/mongodb/status/status.go b/metricbeat/module/mongodb/status/status.go index eb31c510cce..37f478ce924 100644 --- a/metricbeat/module/mongodb/status/status.go +++ b/metricbeat/module/mongodb/status/status.go @@ -4,7 +4,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" - "github.com/scottcrespo/beats/metricbeat/module/mongodb" + "github.com/elastic/beats/metricbeat/module/mongodb" "github.com/pkg/errors" "gopkg.in/mgo.v2" @@ -45,7 +45,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } dialInfo.Timeout = base.Module().Config().Timeout - mongoSession := mongodb.NewMongoSession(dialInfo) + mongoSession := mongodb.NewSession(dialInfo) mongoSession.SetMode(mgo.Monotonic, true) return &MetricSet{ From dd895b503406501613bfebe0ba3914f088318981 Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 13:02:57 -0500 Subject: [PATCH 03/11] debugged field name. integration tests pass --- metricbeat/module/mongodb/dbstats/data.go | 2 +- metricbeat/module/mongodb/dbstats/dbstats.go | 1 + metricbeat/module/mongodb/dbstats/dbstats_integration_test.go | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/metricbeat/module/mongodb/dbstats/data.go b/metricbeat/module/mongodb/dbstats/data.go index 543fcda96f0..682cbba2ad6 100644 --- a/metricbeat/module/mongodb/dbstats/data.go +++ b/metricbeat/module/mongodb/dbstats/data.go @@ -7,7 +7,7 @@ import ( var schema = s.Schema{ "db": c.Str("db"), - "collection": c.Int("collections"), + "collections": c.Int("collections"), "objects": c.Int("objects"), "avg_object_size": c.Int("avgObjectSize"), "data_size": c.Int("dataSize"), diff --git a/metricbeat/module/mongodb/dbstats/dbstats.go b/metricbeat/module/mongodb/dbstats/dbstats.go index dc06d4df6f2..fdbf168b9d8 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats.go +++ b/metricbeat/module/mongodb/dbstats/dbstats.go @@ -40,6 +40,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } dialInfo.Timeout = base.Module().Config().Timeout + // instantiate direct connections to each of the configured Mongo hosts mongoSessions, err := mongodb.NewDirectSessions(dialInfo.Addrs, dialInfo) if err != nil { return nil, err diff --git a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go index b509fb3899d..30226b2b088 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go +++ b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go @@ -24,7 +24,7 @@ func TestFetch(t *testing.T) { dataSize := event["data_size"].(int64) assert.True(t, dataSize > 0) - + t.Logf("HELLO WORLD LN 27") collections := event["collections"].(int64) assert.True(t, collections > 0) } From 2c9224503ce8bc2f1060ead66830679be0e5f67d Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 13:18:53 -0500 Subject: [PATCH 04/11] debugged avg_obj_size field for proper mapping --- .../module/mongodb/dbstats/_meta/fields.yml | 2 +- metricbeat/module/mongodb/dbstats/data.go | 20 ++++++------- .../dbstats/dbstats_integration_test.go | 28 +++++++++++++++++-- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/metricbeat/module/mongodb/dbstats/_meta/fields.yml b/metricbeat/module/mongodb/dbstats/_meta/fields.yml index d2c0a02a3e2..6edc9f7a8b7 100644 --- a/metricbeat/module/mongodb/dbstats/_meta/fields.yml +++ b/metricbeat/module/mongodb/dbstats/_meta/fields.yml @@ -4,7 +4,7 @@ dbstats provides an overview of a particular mongo database. This document is most concerned with data volumes of a database. fields: - - name: avg_object_size + - name: avg_obj_size type: long - name: collections diff --git a/metricbeat/module/mongodb/dbstats/data.go b/metricbeat/module/mongodb/dbstats/data.go index 682cbba2ad6..83eb7345805 100644 --- a/metricbeat/module/mongodb/dbstats/data.go +++ b/metricbeat/module/mongodb/dbstats/data.go @@ -6,16 +6,16 @@ import ( ) var schema = s.Schema{ - "db": c.Str("db"), - "collections": c.Int("collections"), - "objects": c.Int("objects"), - "avg_object_size": c.Int("avgObjectSize"), - "data_size": c.Int("dataSize"), - "storage_size": c.Int("storageSize"), - "num_extents": c.Int("numExtents"), - "indexes": c.Int("indexes"), - "index_size": c.Int("indexSize"), - "file_size": c.Int("fileSize"), + "db": c.Str("db"), + "collections": c.Int("collections"), + "objects": c.Int("objects"), + "avg_obj_size": c.Int("avgObjSize"), + "data_size": c.Int("dataSize"), + "storage_size": c.Int("storageSize"), + "num_extents": c.Int("numExtents"), + "indexes": c.Int("indexes"), + "index_size": c.Int("indexSize"), + "file_size": c.Int("fileSize"), } var eventMapping = schema.Apply diff --git a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go index 30226b2b088..321de691c10 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go +++ b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go @@ -22,11 +22,33 @@ func TestFetch(t *testing.T) { db := event["db"].(string) assert.NotEqual(t, db, "") - dataSize := event["data_size"].(int64) - assert.True(t, dataSize > 0) - t.Logf("HELLO WORLD LN 27") collections := event["collections"].(int64) assert.True(t, collections > 0) + + objects := event["objects"].(int64) + assert.True(t, objects > 0) + + avgObjSize := event["avg_obj_size"].(int64) + assert.True(t, avgObjSize > 0) + + dataSize := event["data_size"].(int64) + assert.True(t, dataSize > 0) + + storageSize := event["storage_size"].(int64) + assert.True(t, storageSize > 0) + + numExtents := event["num_extents"].(int64) + assert.True(t, numExtents > 0) + + indexes := event["indexes"].(int64) + assert.True(t, indexes > 0) + + indexSize := event["index_size"].(int64) + assert.True(t, indexSize > 0) + + fileSize := event["file_size"].(int64) + assert.True(t, fileSize > 0) + } } From 5a9ae7f0ef9d1aeb6be0714f7cc0b2d3f91c9289 Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 15:07:16 -0500 Subject: [PATCH 05/11] set FailFast = True for direct connections to prevent a nonresponsive node from holding up the reporting --- metricbeat/module/mongodb/dbstats/dbstats.go | 15 +++++++++++---- metricbeat/module/mongodb/mongodb.go | 1 + 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/metricbeat/module/mongodb/dbstats/dbstats.go b/metricbeat/module/mongodb/dbstats/dbstats.go index fdbf168b9d8..0c13c87f35d 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats.go +++ b/metricbeat/module/mongodb/dbstats/dbstats.go @@ -57,20 +57,24 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch() ([]common.MapStr, error) { + + // create a wait group because we're going to spawn a goroutine for each host target var wg sync.WaitGroup wg.Add(len(m.mongoSessions)) + // events is the value returned by this function var events []common.MapStr + // created buffered channel to receive async results from each of the nodes channel := make(chan interface{}, len(m.mongoSessions)) - // fetch stats from each individual mongo node for _, mongo := range m.mongoSessions { - go func(mongo *mgo.Session, wg *sync.WaitGroup) { + go func(mongo *mgo.Session) { defer wg.Done() channel <- m.fetchNodeDbStats(mongo) - }(mongo, &wg) + }(mongo) } + // wait for goroutines to complete wg.Wait() close(channel) @@ -80,7 +84,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { events = append(events, data.([]common.MapStr)...) } - //if we didn't get results from any node, return an error + // if we didn't get results from any node, return an error if len(events) == 0 { err := errors.New("Failed to retrieve db stats from all nodes") return []common.MapStr{}, err @@ -91,7 +95,10 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { return events, nil } +// fetchNodeDbStats implements the logic to fetch dbstats() for all databases on a particular +// mongo node. func (m *MetricSet) fetchNodeDbStats(session *mgo.Session) []common.MapStr { + // Get the list of databases names, which we'll use to call db.stats() on each dbNames, err := session.DatabaseNames() if err != nil { diff --git a/metricbeat/module/mongodb/mongodb.go b/metricbeat/module/mongodb/mongodb.go index 3dd52ff1238..0549f2bb305 100644 --- a/metricbeat/module/mongodb/mongodb.go +++ b/metricbeat/module/mongodb/mongodb.go @@ -89,6 +89,7 @@ func NewDirectSessions(urls []string, dialInfo *mgo.DialInfo) ([]*mgo.Session, e url, } nodeDialInfo.Direct = true + nodeDialInfo.FailFast = true session, err := mgo.DialWithInfo(&nodeDialInfo) if err != nil { From 8e1934ca722a249bcaa17e55c4b55a2cd0edc889 Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 17:08:13 -0500 Subject: [PATCH 06/11] provide mongodb host parser to AddMetricSet() --- metricbeat/module/mongodb/dbstats/dbstats.go | 2 +- metricbeat/module/mongodb/status/status.go | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/metricbeat/module/mongodb/dbstats/dbstats.go b/metricbeat/module/mongodb/dbstats/dbstats.go index 0c13c87f35d..2bf00396cad 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats.go +++ b/metricbeat/module/mongodb/dbstats/dbstats.go @@ -15,7 +15,7 @@ import ( // init registers the MetricSet with the central registry. // The New method will be called after the setup of the module and before starting to fetch data func init() { - if err := mb.Registry.AddMetricSet("mongodb", "dbstats", New); err != nil { + if err := mb.Registry.AddMetricSet("mongodb", "dbstats", New, mongodb.ParseURL); err != nil { panic(err) } } diff --git a/metricbeat/module/mongodb/status/status.go b/metricbeat/module/mongodb/status/status.go index 37f478ce924..504bbc96fd8 100644 --- a/metricbeat/module/mongodb/status/status.go +++ b/metricbeat/module/mongodb/status/status.go @@ -2,7 +2,6 @@ package status import ( "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/mongodb" @@ -17,8 +16,6 @@ TODOs: * add a metricset for "metrics" data */ -var debugf = logp.MakeDebug("mongodb.status") - func init() { if err := mb.Registry.AddMetricSet("mongodb", "status", New, mongodb.ParseURL); err != nil { panic(err) From 1f420d73d2b4e3a71fbb574233ab4b43322262f9 Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 17:53:41 -0500 Subject: [PATCH 07/11] implementing multi-node direct reporting for mongodb.serverStatus --- metricbeat/module/mongodb/status/status.go | 65 ++++++++++++++++++---- 1 file changed, 55 insertions(+), 10 deletions(-) diff --git a/metricbeat/module/mongodb/status/status.go b/metricbeat/module/mongodb/status/status.go index 504bbc96fd8..912d3d27645 100644 --- a/metricbeat/module/mongodb/status/status.go +++ b/metricbeat/module/mongodb/status/status.go @@ -1,6 +1,9 @@ package status import ( + "fmt" + "sync" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/module/mongodb" @@ -28,8 +31,8 @@ func init() { // multiple fetch calls. type MetricSet struct { mb.BaseMetricSet - dialInfo *mgo.DialInfo - mongoSession *mgo.Session + dialInfo *mgo.DialInfo + mongoSessions []*mgo.Session } // New creates a new instance of the MetricSet @@ -42,24 +45,66 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } dialInfo.Timeout = base.Module().Config().Timeout - mongoSession := mongodb.NewSession(dialInfo) - mongoSession.SetMode(mgo.Monotonic, true) + // instantiate direct connections to each of the configured Mongo hosts + mongoSessions, err := mongodb.NewDirectSessions(dialInfo.Addrs, dialInfo) + if err != nil { + return nil, err + } return &MetricSet{ BaseMetricSet: base, dialInfo: dialInfo, - mongoSession: mongoSession, + mongoSessions: mongoSessions, }, nil } // Fetch methods implements the data gathering and data conversion to the right format // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. -func (m *MetricSet) Fetch() (common.MapStr, error) { - result := map[string]interface{}{} - if err := m.mongoSession.DB("admin").Run(bson.D{{"serverStatus", 1}}, &result); err != nil { - return nil, errors.Wrap(err, "mongodb fetch failed") +func (m *MetricSet) Fetch() ([]common.MapStr, error) { + + // create a wait group because we're going to spawn a goroutine for each host target + var wg sync.WaitGroup + wg.Add(len(m.mongoSessions)) + + // events is the value returned by this function + var events []common.MapStr + + // created buffered channel to receive async results from each of the nodes + channel := make(chan interface{}, len(m.mongoSessions)) + + for _, mongo := range m.mongoSessions { + go func(mongo *mgo.Session) { + defer wg.Done() + channel <- m.fetchNodeStatus(mongo) + }(mongo) + } + + // wait for goroutines to complete + wg.Wait() + close(channel) + + // pull results off of the channel and append to events + for data := range channel { + events = append(events, data.(common.MapStr)) + } + + // if we didn't get results from any node, return an error + if len(events) == 0 { + err := errors.New("Failed to retrieve db stats from all nodes") + return events, err + } + + fmt.Printf("%v", events) + + return events, nil +} + +func (m *MetricSet) fetchNodeStatus(session *mgo.Session) common.MapStr { + result := common.MapStr{} + if err := session.DB("admin").Run(bson.D{{"serverStatus", 1}}, &result); err != nil { + return nil } - return eventMapping(result), nil + return eventMapping(result) } From 65d8d8924f36c5446a5c06b8004967e7e4ea94a9 Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 17:57:36 -0500 Subject: [PATCH 08/11] changed database response back to map[string]interface{}{}. otherwise fields get lost when calling eventMapping() --- metricbeat/metricbeat.yml | 6 +++++- metricbeat/module/mongodb/status/status.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/metricbeat/metricbeat.yml b/metricbeat/metricbeat.yml index 54328186bf6..b96bdcb271b 100644 --- a/metricbeat/metricbeat.yml +++ b/metricbeat/metricbeat.yml @@ -43,7 +43,11 @@ metricbeat.modules: period: 10s processes: ['.*'] - +- module: mongodb + metricsets: ["status"] + enabled: true + period: 1s + hosts: ["localhost:27017"] #================================ General ===================================== diff --git a/metricbeat/module/mongodb/status/status.go b/metricbeat/module/mongodb/status/status.go index 912d3d27645..c557efd7d82 100644 --- a/metricbeat/module/mongodb/status/status.go +++ b/metricbeat/module/mongodb/status/status.go @@ -101,7 +101,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { } func (m *MetricSet) fetchNodeStatus(session *mgo.Session) common.MapStr { - result := common.MapStr{} + result := map[string]interface{}{} if err := session.DB("admin").Run(bson.D{{"serverStatus", 1}}, &result); err != nil { return nil } From a4a6e7520bc09a118bdf5ff7c7b183472352e0bb Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 18:09:47 -0500 Subject: [PATCH 09/11] debugging test assertion values so integration test suite passes --- metricbeat/module/mongodb/dbstats/dbstats_integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go index 321de691c10..85a05b931ff 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go +++ b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go @@ -38,10 +38,10 @@ func TestFetch(t *testing.T) { assert.True(t, storageSize > 0) numExtents := event["num_extents"].(int64) - assert.True(t, numExtents > 0) + assert.True(t, numExtents >= 0) indexes := event["indexes"].(int64) - assert.True(t, indexes > 0) + assert.True(t, indexes >= 0) indexSize := event["index_size"].(int64) assert.True(t, indexSize > 0) From 3c6d97c482e209c0f3429c01750ab9a6997ec849 Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 18:50:38 -0500 Subject: [PATCH 10/11] debugged integration tests to work with metricbeats testsuite. added additional fields to dbstats metricset --- .../module/mongodb/dbstats/_meta/fields.yml | 21 ++++++++++++++ metricbeat/module/mongodb/dbstats/data.go | 15 +++++++++- metricbeat/module/mongodb/dbstats/dbstats.go | 10 ++----- .../dbstats/dbstats_integration_test.go | 4 --- metricbeat/module/mongodb/status/status.go | 9 ++++-- .../mongodb/status/status_integration_test.go | 28 +++++++++---------- 6 files changed, 57 insertions(+), 30 deletions(-) diff --git a/metricbeat/module/mongodb/dbstats/_meta/fields.yml b/metricbeat/module/mongodb/dbstats/_meta/fields.yml index 6edc9f7a8b7..5d818507799 100644 --- a/metricbeat/module/mongodb/dbstats/_meta/fields.yml +++ b/metricbeat/module/mongodb/dbstats/_meta/fields.yml @@ -33,3 +33,24 @@ - name: storage_size type: long + + - name: ns_size_mb + type: long + + - name: data_file_version + type: group + fields: + - name: major + type: long + + - name: minor + type: long + + - name: extent_free_list + type: group + fields: + - name: num + type: long + + - name: size + type: long diff --git a/metricbeat/module/mongodb/dbstats/data.go b/metricbeat/module/mongodb/dbstats/data.go index 83eb7345805..dfd96875d86 100644 --- a/metricbeat/module/mongodb/dbstats/data.go +++ b/metricbeat/module/mongodb/dbstats/data.go @@ -15,7 +15,20 @@ var schema = s.Schema{ "num_extents": c.Int("numExtents"), "indexes": c.Int("indexes"), "index_size": c.Int("indexSize"), - "file_size": c.Int("fileSize"), + // mmapv1 only + "ns_size_mb": c.Int("nsSizeMB"), + // mmapv1 only + "file_size": c.Int("fileSize"), + // mmapv1 only + "data_file_version": c.Dict("dataFileVersion", s.Schema{ + "major": c.Int("major"), + "minor": c.Int("minor"), + }), + // mmapv1 only + "extent_free_list": c.Dict("extentFreeList", s.Schema{ + "num": c.Int("num"), + "size": c.Int("size"), + }), } var eventMapping = schema.Apply diff --git a/metricbeat/module/mongodb/dbstats/dbstats.go b/metricbeat/module/mongodb/dbstats/dbstats.go index 2bf00396cad..4a43c6a9c53 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats.go +++ b/metricbeat/module/mongodb/dbstats/dbstats.go @@ -66,7 +66,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { var events []common.MapStr // created buffered channel to receive async results from each of the nodes - channel := make(chan interface{}, len(m.mongoSessions)) + channel := make(chan []common.MapStr, len(m.mongoSessions)) for _, mongo := range m.mongoSessions { go func(mongo *mgo.Session) { @@ -81,7 +81,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { // pull results off of the channel and append to events for data := range channel { - events = append(events, data.([]common.MapStr)...) + events = append(events, data...) } // if we didn't get results from any node, return an error @@ -123,11 +123,5 @@ func (m *MetricSet) fetchNodeDbStats(session *mgo.Session) []common.MapStr { events = append(events, eventMapping(result)) } - // if we failed to collect on any databases, return an error - if len(events) == 0 { - err = errors.New("Failed to fetch stats for all databases in mongo instance") - return []common.MapStr{} - } - return events } diff --git a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go index 85a05b931ff..b685af2584e 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go +++ b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go @@ -45,10 +45,6 @@ func TestFetch(t *testing.T) { indexSize := event["index_size"].(int64) assert.True(t, indexSize > 0) - - fileSize := event["file_size"].(int64) - assert.True(t, fileSize > 0) - } } diff --git a/metricbeat/module/mongodb/status/status.go b/metricbeat/module/mongodb/status/status.go index c557efd7d82..2025d6526c2 100644 --- a/metricbeat/module/mongodb/status/status.go +++ b/metricbeat/module/mongodb/status/status.go @@ -71,12 +71,15 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { var events []common.MapStr // created buffered channel to receive async results from each of the nodes - channel := make(chan interface{}, len(m.mongoSessions)) + channel := make(chan common.MapStr, len(m.mongoSessions)) for _, mongo := range m.mongoSessions { go func(mongo *mgo.Session) { defer wg.Done() - channel <- m.fetchNodeStatus(mongo) + result := m.fetchNodeStatus(mongo) + if result != nil { + channel <- result + } }(mongo) } @@ -86,7 +89,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { // pull results off of the channel and append to events for data := range channel { - events = append(events, data.(common.MapStr)) + events = append(events, data) } // if we didn't get results from any node, return an error diff --git a/metricbeat/module/mongodb/status/status_integration_test.go b/metricbeat/module/mongodb/status/status_integration_test.go index c86d850abab..bef15ff3f3f 100644 --- a/metricbeat/module/mongodb/status/status_integration_test.go +++ b/metricbeat/module/mongodb/status/status_integration_test.go @@ -1,5 +1,3 @@ -// +build integration - package status import ( @@ -12,28 +10,30 @@ import ( ) func TestFetch(t *testing.T) { - f := mbtest.NewEventFetcher(t, getConfig()) - event, err := f.Fetch() + f := mbtest.NewEventsFetcher(t, getConfig()) + events, err := f.Fetch() if !assert.NoError(t, err) { t.FailNow() } - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + for _, event := range events { + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) - // Check event fields - current := event["connections"].(common.MapStr)["current"].(int64) - assert.True(t, current >= 0) + // Check event fields + current := event["connections"].(common.MapStr)["current"].(int64) + assert.True(t, current >= 0) - available := event["connections"].(common.MapStr)["available"].(int64) - assert.True(t, available > 0) + available := event["connections"].(common.MapStr)["available"].(int64) + assert.True(t, available > 0) - pageFaults := event["extra_info"].(common.MapStr)["page_faults"].(int64) - assert.True(t, pageFaults >= 0) + pageFaults := event["extra_info"].(common.MapStr)["page_faults"].(int64) + assert.True(t, pageFaults >= 0) + } } func TestData(t *testing.T) { - f := mbtest.NewEventFetcher(t, getConfig()) - err := mbtest.WriteEvent(f, t) + f := mbtest.NewEventsFetcher(t, getConfig()) + err := mbtest.WriteEvents(f, t) if err != nil { t.Fatal("write", err) } From aafaafeb1203146eb299479b19bc70b12ca03f40 Mon Sep 17 00:00:00 2001 From: Scott Crespo Date: Thu, 29 Dec 2016 18:53:16 -0500 Subject: [PATCH 11/11] copied original metricbeat.yml. Accidentally over-wrote it --- metricbeat/metricbeat.yml | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/metricbeat/metricbeat.yml b/metricbeat/metricbeat.yml index b96bdcb271b..54328186bf6 100644 --- a/metricbeat/metricbeat.yml +++ b/metricbeat/metricbeat.yml @@ -43,11 +43,7 @@ metricbeat.modules: period: 10s processes: ['.*'] -- module: mongodb - metricsets: ["status"] - enabled: true - period: 1s - hosts: ["localhost:27017"] + #================================ General =====================================