diff --git a/metricbeat/module/mongodb/dbstats/_meta/fields.yml b/metricbeat/module/mongodb/dbstats/_meta/fields.yml index 6edc9f7a8b7..5d818507799 100644 --- a/metricbeat/module/mongodb/dbstats/_meta/fields.yml +++ b/metricbeat/module/mongodb/dbstats/_meta/fields.yml @@ -33,3 +33,24 @@ - name: storage_size type: long + + - name: ns_size_mb + type: long + + - name: data_file_version + type: group + fields: + - name: major + type: long + + - name: minor + type: long + + - name: extent_free_list + type: group + fields: + - name: num + type: long + + - name: size + type: long diff --git a/metricbeat/module/mongodb/dbstats/data.go b/metricbeat/module/mongodb/dbstats/data.go index 83eb7345805..dfd96875d86 100644 --- a/metricbeat/module/mongodb/dbstats/data.go +++ b/metricbeat/module/mongodb/dbstats/data.go @@ -15,7 +15,20 @@ var schema = s.Schema{ "num_extents": c.Int("numExtents"), "indexes": c.Int("indexes"), "index_size": c.Int("indexSize"), - "file_size": c.Int("fileSize"), + // mmapv1 only + "ns_size_mb": c.Int("nsSizeMB"), + // mmapv1 only + "file_size": c.Int("fileSize"), + // mmapv1 only + "data_file_version": c.Dict("dataFileVersion", s.Schema{ + "major": c.Int("major"), + "minor": c.Int("minor"), + }), + // mmapv1 only + "extent_free_list": c.Dict("extentFreeList", s.Schema{ + "num": c.Int("num"), + "size": c.Int("size"), + }), } var eventMapping = schema.Apply diff --git a/metricbeat/module/mongodb/dbstats/dbstats.go b/metricbeat/module/mongodb/dbstats/dbstats.go index 2bf00396cad..4a43c6a9c53 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats.go +++ b/metricbeat/module/mongodb/dbstats/dbstats.go @@ -66,7 +66,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { var events []common.MapStr // created buffered channel to receive async results from each of the nodes - channel := make(chan interface{}, len(m.mongoSessions)) + channel := make(chan []common.MapStr, len(m.mongoSessions)) for _, mongo := range m.mongoSessions { go func(mongo *mgo.Session) { @@ -81,7 +81,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { // pull results off of the channel and append to events for data := range channel { - events = append(events, data.([]common.MapStr)...) + events = append(events, data...) } // if we didn't get results from any node, return an error @@ -123,11 +123,5 @@ func (m *MetricSet) fetchNodeDbStats(session *mgo.Session) []common.MapStr { events = append(events, eventMapping(result)) } - // if we failed to collect on any databases, return an error - if len(events) == 0 { - err = errors.New("Failed to fetch stats for all databases in mongo instance") - return []common.MapStr{} - } - return events } diff --git a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go index 85a05b931ff..b685af2584e 100644 --- a/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go +++ b/metricbeat/module/mongodb/dbstats/dbstats_integration_test.go @@ -45,10 +45,6 @@ func TestFetch(t *testing.T) { indexSize := event["index_size"].(int64) assert.True(t, indexSize > 0) - - fileSize := event["file_size"].(int64) - assert.True(t, fileSize > 0) - } } diff --git a/metricbeat/module/mongodb/status/status.go b/metricbeat/module/mongodb/status/status.go index c557efd7d82..2025d6526c2 100644 --- a/metricbeat/module/mongodb/status/status.go +++ b/metricbeat/module/mongodb/status/status.go @@ -71,12 +71,15 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { var events []common.MapStr // created buffered channel to receive async results from each of the nodes - channel := make(chan interface{}, len(m.mongoSessions)) + channel := make(chan common.MapStr, len(m.mongoSessions)) for _, mongo := range m.mongoSessions { go func(mongo *mgo.Session) { defer wg.Done() - channel <- m.fetchNodeStatus(mongo) + result := m.fetchNodeStatus(mongo) + if result != nil { + channel <- result + } }(mongo) } @@ -86,7 +89,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) { // pull results off of the channel and append to events for data := range channel { - events = append(events, data.(common.MapStr)) + events = append(events, data) } // if we didn't get results from any node, return an error diff --git a/metricbeat/module/mongodb/status/status_integration_test.go b/metricbeat/module/mongodb/status/status_integration_test.go index c86d850abab..bef15ff3f3f 100644 --- a/metricbeat/module/mongodb/status/status_integration_test.go +++ b/metricbeat/module/mongodb/status/status_integration_test.go @@ -1,5 +1,3 @@ -// +build integration - package status import ( @@ -12,28 +10,30 @@ import ( ) func TestFetch(t *testing.T) { - f := mbtest.NewEventFetcher(t, getConfig()) - event, err := f.Fetch() + f := mbtest.NewEventsFetcher(t, getConfig()) + events, err := f.Fetch() if !assert.NoError(t, err) { t.FailNow() } - t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) + for _, event := range events { + t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event) - // Check event fields - current := event["connections"].(common.MapStr)["current"].(int64) - assert.True(t, current >= 0) + // Check event fields + current := event["connections"].(common.MapStr)["current"].(int64) + assert.True(t, current >= 0) - available := event["connections"].(common.MapStr)["available"].(int64) - assert.True(t, available > 0) + available := event["connections"].(common.MapStr)["available"].(int64) + assert.True(t, available > 0) - pageFaults := event["extra_info"].(common.MapStr)["page_faults"].(int64) - assert.True(t, pageFaults >= 0) + pageFaults := event["extra_info"].(common.MapStr)["page_faults"].(int64) + assert.True(t, pageFaults >= 0) + } } func TestData(t *testing.T) { - f := mbtest.NewEventFetcher(t, getConfig()) - err := mbtest.WriteEvent(f, t) + f := mbtest.NewEventsFetcher(t, getConfig()) + err := mbtest.WriteEvents(f, t) if err != nil { t.Fatal("write", err) }