Skip to content

Commit

Permalink
debugged integration tests to work with metricbeats testsuite. added …
Browse files Browse the repository at this point in the history
…additional fields to dbstats metricset
  • Loading branch information
Scott Crespo committed Dec 29, 2016
1 parent a4a6e75 commit 3c6d97c
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 30 deletions.
21 changes: 21 additions & 0 deletions metricbeat/module/mongodb/dbstats/_meta/fields.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,24 @@

- name: storage_size
type: long

- name: ns_size_mb
type: long

- name: data_file_version
type: group
fields:
- name: major
type: long

- name: minor
type: long

- name: extent_free_list
type: group
fields:
- name: num
type: long

- name: size
type: long
15 changes: 14 additions & 1 deletion metricbeat/module/mongodb/dbstats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,20 @@ var schema = s.Schema{
"num_extents": c.Int("numExtents"),
"indexes": c.Int("indexes"),
"index_size": c.Int("indexSize"),
"file_size": c.Int("fileSize"),
// mmapv1 only
"ns_size_mb": c.Int("nsSizeMB"),
// mmapv1 only
"file_size": c.Int("fileSize"),
// mmapv1 only
"data_file_version": c.Dict("dataFileVersion", s.Schema{
"major": c.Int("major"),
"minor": c.Int("minor"),
}),
// mmapv1 only
"extent_free_list": c.Dict("extentFreeList", s.Schema{
"num": c.Int("num"),
"size": c.Int("size"),
}),
}

var eventMapping = schema.Apply
10 changes: 2 additions & 8 deletions metricbeat/module/mongodb/dbstats/dbstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
var events []common.MapStr

// created buffered channel to receive async results from each of the nodes
channel := make(chan interface{}, len(m.mongoSessions))
channel := make(chan []common.MapStr, len(m.mongoSessions))

for _, mongo := range m.mongoSessions {
go func(mongo *mgo.Session) {
Expand All @@ -81,7 +81,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {

// pull results off of the channel and append to events
for data := range channel {
events = append(events, data.([]common.MapStr)...)
events = append(events, data...)
}

// if we didn't get results from any node, return an error
Expand Down Expand Up @@ -123,11 +123,5 @@ func (m *MetricSet) fetchNodeDbStats(session *mgo.Session) []common.MapStr {
events = append(events, eventMapping(result))
}

// if we failed to collect on any databases, return an error
if len(events) == 0 {
err = errors.New("Failed to fetch stats for all databases in mongo instance")
return []common.MapStr{}
}

return events
}
4 changes: 0 additions & 4 deletions metricbeat/module/mongodb/dbstats/dbstats_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,6 @@ func TestFetch(t *testing.T) {

indexSize := event["index_size"].(int64)
assert.True(t, indexSize > 0)

fileSize := event["file_size"].(int64)
assert.True(t, fileSize > 0)

}
}

Expand Down
9 changes: 6 additions & 3 deletions metricbeat/module/mongodb/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,15 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {
var events []common.MapStr

// created buffered channel to receive async results from each of the nodes
channel := make(chan interface{}, len(m.mongoSessions))
channel := make(chan common.MapStr, len(m.mongoSessions))

for _, mongo := range m.mongoSessions {
go func(mongo *mgo.Session) {
defer wg.Done()
channel <- m.fetchNodeStatus(mongo)
result := m.fetchNodeStatus(mongo)
if result != nil {
channel <- result
}
}(mongo)
}

Expand All @@ -86,7 +89,7 @@ func (m *MetricSet) Fetch() ([]common.MapStr, error) {

// pull results off of the channel and append to events
for data := range channel {
events = append(events, data.(common.MapStr))
events = append(events, data)
}

// if we didn't get results from any node, return an error
Expand Down
28 changes: 14 additions & 14 deletions metricbeat/module/mongodb/status/status_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// +build integration

package status

import (
Expand All @@ -12,28 +10,30 @@ import (
)

func TestFetch(t *testing.T) {
f := mbtest.NewEventFetcher(t, getConfig())
event, err := f.Fetch()
f := mbtest.NewEventsFetcher(t, getConfig())
events, err := f.Fetch()
if !assert.NoError(t, err) {
t.FailNow()
}

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)
for _, event := range events {
t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event)

// Check event fields
current := event["connections"].(common.MapStr)["current"].(int64)
assert.True(t, current >= 0)
// Check event fields
current := event["connections"].(common.MapStr)["current"].(int64)
assert.True(t, current >= 0)

available := event["connections"].(common.MapStr)["available"].(int64)
assert.True(t, available > 0)
available := event["connections"].(common.MapStr)["available"].(int64)
assert.True(t, available > 0)

pageFaults := event["extra_info"].(common.MapStr)["page_faults"].(int64)
assert.True(t, pageFaults >= 0)
pageFaults := event["extra_info"].(common.MapStr)["page_faults"].(int64)
assert.True(t, pageFaults >= 0)
}
}

func TestData(t *testing.T) {
f := mbtest.NewEventFetcher(t, getConfig())
err := mbtest.WriteEvent(f, t)
f := mbtest.NewEventsFetcher(t, getConfig())
err := mbtest.WriteEvents(f, t)
if err != nil {
t.Fatal("write", err)
}
Expand Down

0 comments on commit 3c6d97c

Please sign in to comment.