Skip to content

Commit

Permalink
Upgrade Mongodb library in Beats to accept v5 metrics (#31185)
Browse files Browse the repository at this point in the history
  • Loading branch information
sayden authored May 17, 2022
1 parent aafa2d4 commit 49ae09a
Show file tree
Hide file tree
Showing 29 changed files with 1,691 additions and 951 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Add orchestrator cluster ECS fields in kubernetes events {pull}31341[31341]
- Generic SQL code reorganization, with support for raw metrics and query lists {pull}31568[31568]
- Add metadata for missing k8s resources/metricsets {pull}31590[31590]
- Upgrade Mongodb library in Beats to v5 {pull}31185[31185]

*Packetbeat*

Expand Down
1,236 changes: 1,029 additions & 207 deletions NOTICE.txt

Large diffs are not rendered by default.

10 changes: 7 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,6 @@ require (
google.golang.org/protobuf v1.28.0
gopkg.in/inf.v0 v0.9.1
gopkg.in/jcmturner/gokrb5.v7 v7.5.0
gopkg.in/mgo.v2 v2.0.0-20160818020120-3f83fa500528
gopkg.in/yaml.v2 v2.4.0
gotest.tools v2.2.0+incompatible
gotest.tools/gotestsum v1.7.0
Expand All @@ -167,6 +166,7 @@ require (
go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0
go.elastic.co/apm/module/apmhttp/v2 v2.0.0
go.elastic.co/apm/v2 v2.0.0
go.mongodb.org/mongo-driver v1.5.1
)

require (
Expand Down Expand Up @@ -194,6 +194,7 @@ require (
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4 // indirect
github.com/cncf/xds/go v0.0.0-20211130200136-a8f946100490 // indirect
github.com/containerd/containerd v1.5.7 // indirect
github.com/cyphar/filepath-securejoin v0.2.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davecgh/go-xdr v0.0.0-20161123171359-e6a2ba005892 // indirect
Expand All @@ -213,6 +214,7 @@ require (
github.com/go-logr/logr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-sourcemap/sourcemap v2.1.2+incompatible // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gobuffalo/here v0.6.0 // indirect
github.com/godbus/dbus/v5 v5.0.6 // indirect
github.com/golang-jwt/jwt/v4 v4.0.0 // indirect
Expand Down Expand Up @@ -273,7 +275,11 @@ require (
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/urso/diag v0.0.0-20200210123136-21b3cc8eb797 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.0.2 // indirect
github.com/xdg-go/stringprep v1.0.2 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
github.com/yuin/gopher-lua v0.0.0-20170403160031-b402f3114ec7 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.elastic.co/fastjson v1.1.0 // indirect
Expand All @@ -295,8 +301,6 @@ require (
sigs.k8s.io/yaml v1.2.0 // indirect
)

require github.com/containerd/containerd v1.6.1 // indirect

replace (
github.com/Microsoft/go-winio => github.com/bi-zone/go-winio v0.4.15
github.com/Shopify/sarama => github.com/elastic/sarama v1.19.1-0.20220310193331-ebc2b0d8eef3
Expand Down
168 changes: 12 additions & 156 deletions go.sum

Large diffs are not rendered by default.

23 changes: 10 additions & 13 deletions metricbeat/module/mongodb/collstats/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"event": {
"dataset": "mongodb.collstats",
"duration": 115000,
"module": "mongodb"
},
"metricset": {
"name": "collstats"
"name": "collstats",
"period": 10000
},
"mongodb": {
"collstats": {
"collection": "startup_log",
"collection": "oplog.rs",
"commands": {
"count": 0,
"time": {
Expand All @@ -36,19 +33,19 @@
},
"lock": {
"read": {
"count": 74,
"count": 1,
"time": {
"us": 443
"us": 7
}
},
"write": {
"count": 1,
"time": {
"us": 8
"us": 6600
}
}
},
"name": "local.startup_log",
"name": "local.oplog.rs",
"queries": {
"count": 0,
"time": {
Expand All @@ -62,9 +59,9 @@
}
},
"total": {
"count": 75,
"count": 2,
"time": {
"us": 451
"us": 6607
}
},
"update": {
Expand All @@ -76,7 +73,7 @@
}
},
"service": {
"address": "172.26.0.2:27017",
"address": "172.28.0.5:27017",
"type": "mongodb"
}
}
71 changes: 43 additions & 28 deletions metricbeat/module/mongodb/collstats/collstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
package collstats

import (
"github.com/pkg/errors"
"context"
"errors"
"fmt"

"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/metricbeat/module/mongodb"
"github.com/elastic/elastic-agent-libs/mapstr"

"go.mongodb.org/mongo-driver/bson"
)

func init() {
Expand All @@ -32,74 +35,86 @@ func init() {
)
}

// MetricSet type defines all fields of the MetricSet
// Metricset type defines all fields of the Metricset
// As a minimum it must inherit the mb.BaseMetricSet fields, but can be extended with
// additional entries. These variables can be used to persist data or configuration between
// multiple fetch calls.
type MetricSet struct {
*mongodb.MetricSet
type Metricset struct {
*mongodb.Metricset
}

// New creates a new instance of the MetricSet
// New creates a new instance of the Metricset
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
ms, err := mongodb.NewMetricSet(base)
ms, err := mongodb.NewMetricset(base)
if err != nil {
return nil, err
return nil, fmt.Errorf("could not create mongodb metricset: %w", err)
}
return &MetricSet{ms}, nil

return &Metricset{ms}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
// instantiate direct connections to each of the configured Mongo hosts
mongoSession, err := mongodb.NewDirectSession(m.DialInfo)
func (m *Metricset) Fetch(reporter mb.ReporterV2) error {
client, err := mongodb.NewClient(m.Metricset.Config, m.Module().Config().Timeout, 0)
if err != nil {
return errors.Wrap(err, "error creating new Session")
return fmt.Errorf("could not create mongodb client: %w", err)
}
defer mongoSession.Close()

result := mapstr.M{}
defer func() {
if disconnectErr := client.Disconnect(context.Background()); disconnectErr != nil {
m.Logger().Warn("client disconnection did not happen gracefully")
}
}()

err = mongoSession.Run("top", &result)
if err != nil {
return errors.Wrap(err, "Error retrieving collection totals from Mongo instance")
return fmt.Errorf("could not get a list of databases: %w", err)
}

// This info is only stored in 'admin' database
db := client.Database("admin")
res := db.RunCommand(context.Background(), bson.D{bson.E{Key: "top"}})
if err = res.Err(); err != nil {
return fmt.Errorf("'top' command failed: %w", err)
}

var result map[string]interface{}
if err = res.Decode(&result); err != nil {
return fmt.Errorf("could not decode mongo response: %w", err)
}

if _, ok := result["totals"]; !ok {
return errors.New("Error accessing collection totals in returned data")
return errors.New("collection 'totals' key not found in mongodb response")
}

totals, ok := result["totals"].(mapstr.M)
totals, ok := result["totals"].(map[string]interface{})
if !ok {
return errors.New("Collection totals are not a map")
return errors.New("collection 'totals' are not a map")
}

for group, info := range totals {
if group == "note" {
continue
}

infoMap, ok := info.(mapstr.M)
infoMap, ok := info.(map[string]interface{})
if !ok {
err = errors.New("Unexpected data returned by mongodb")
reporter.Error(err)
m.Logger().Error(err)
reporter.Error(errors.New("unexpected data returned by mongodb"))
continue
}

event, err := eventMapping(group, infoMap)
if err != nil {
err = errors.Wrap(err, "Mapping of the event data filed")
reporter.Error(err)
m.Logger().Error(err)
reporter.Error(fmt.Errorf("mapping of the event data filed: %w", err))
continue
}

reporter.Event(mb.Event{MetricSetFields: event})
reporter.Event(mb.Event{
MetricSetFields: event,
})
}

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestData(t *testing.T) {

f := mbtest.NewReportingMetricSetV2Error(t, getConfig(service.Host()))
if err := mbtest.WriteEventsReporterV2Error(f, t, ""); err != nil {
t.Fatal("write", err)
t.Fatal("error trying to create data.json file:", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/mongodb/collstats/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func eventMapping(key string, data mapstr.M) (mapstr.M, error) {
names := strings.SplitN(key, ".", 2)

if len(names) < 2 {
return nil, errors.New("Collection name invalid")
return nil, errors.New("collection name invalid")
}

event := mapstr.M{
Expand Down
5 changes: 4 additions & 1 deletion metricbeat/module/mongodb/collstats/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ func TestEventMapping(t *testing.T) {
assert.NoError(t, err)

data := mapstr.M{}
json.Unmarshal(content, &data)
err = json.Unmarshal(content, &data)
if err != nil {
t.Fatal(err)
}

event, _ := eventMapping("unit.test", data)

Expand Down
26 changes: 11 additions & 15 deletions metricbeat/module/mongodb/dbstats/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,42 +1,38 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"event": {
"dataset": "mongodb.dbstats",
"duration": 115000,
"module": "mongodb"
},
"metricset": {
"name": "dbstats"
"name": "dbstats",
"period": 10000
},
"mongodb": {
"dbstats": {
"avg_obj_size": {
"bytes": 741
"bytes": 59
},
"collections": 2,
"collections": 1,
"data_size": {
"bytes": 1482
"bytes": 59
},
"db": "local",
"db": "admin",
"file_size": {},
"index_size": {
"bytes": 32768
"bytes": 20480
},
"indexes": 2,
"indexes": 1,
"ns_size_mb": {},
"num_extents": 0,
"objects": 2,
"objects": 1,
"storage_size": {
"bytes": 32768
"bytes": 20480
}
}
},
"service": {
"address": "172.26.0.2:27017",
"address": "172.28.0.2:27017",
"type": "mongodb"
}
}
Loading

0 comments on commit 49ae09a

Please sign in to comment.