Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for cloudfoundry metrics collection using consumer v1 API #19268

Merged
merged 11 commits into from
Jun 24, 2020
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add memory metrics into compute googlecloud. {pull}18802[18802]
- Add new fields to HAProxy module. {issue}18523[18523]
- Add Tomcat overview dashboard {pull}14026[14026]
- Add support for v1 consumer API in Cloud Foundry module, use it by default. {pull}19268[19268]

*Packetbeat*

Expand Down
9 changes: 8 additions & 1 deletion metricbeat/docs/modules/cloudfoundry.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr
[float]
=== `rlp_address`

The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)".
The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)".

[float]
=== `client_id`
Expand All @@ -103,6 +103,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "".
Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events
from the RLP Gateway. Default: "(generated UUID)".

[float]
==== `version`

Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control.
Use `v2` to collect events from the RLP Gateway. Default: "`v1`".

[float]
=== `ssl`

Expand Down Expand Up @@ -130,6 +136,7 @@ metricbeat.modules:
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1
----

[float]
Expand Down
8 changes: 5 additions & 3 deletions x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,11 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF
}
cb(event)
case err := <-errChan:
// This error is an error on the connection, not a cloud foundry
// error envelope. Firehose should be able to reconnect, so just log it.
c.log.Infof("Error received on firehose: %v", err)
if err != nil {
// This error is an error on the connection, not a cloud foundry
// error envelope. Firehose should be able to reconnect, so just log it.
c.log.Infof("Error received on firehose: %v", err)
}
case <-c.stop:
return
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ metricbeat.modules:
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1

#----------------------------- CockroachDB Module -----------------------------
- module: cockroachdb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1
8 changes: 7 additions & 1 deletion x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr
[float]
=== `rlp_address`

The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)".
The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)".

[float]
=== `client_id`
Expand All @@ -93,6 +93,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "".
Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events
from the RLP Gateway. Default: "(generated UUID)".

[float]
==== `version`

Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control.
Use `v2` to collect events from the RLP Gateway. Default: "`v1`".

[float]
=== `ssl`

Expand Down
126 changes: 14 additions & 112 deletions x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
package cloudfoundry

import (
"context"
"sync"
"fmt"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -16,26 +15,18 @@ import (
// ModuleName is the name of this module.
const ModuleName = "cloudfoundry"

type Module struct {
mb.BaseModule

log *logp.Logger

hub *cfcommon.Hub
listener *cfcommon.RlpListener
listenerLock sync.Mutex

counterReporter mb.PushReporterV2
valueReporter mb.PushReporterV2
containerReporter mb.PushReporterV2
}

func init() {
if err := mb.Registry.AddModule(ModuleName, newModule); err != nil {
panic(err)
}
}

type Module interface {
RunCounterReporter(mb.PushReporterV2)
RunContainerReporter(mb.PushReporterV2)
RunValueReporter(mb.PushReporterV2)
}

func newModule(base mb.BaseModule) (mb.Module, error) {
var cfg cfcommon.Config
if err := base.UnpackConfig(&cfg); err != nil {
Expand All @@ -45,101 +36,12 @@ func newModule(base mb.BaseModule) (mb.Module, error) {
log := logp.NewLogger("cloudfoundry")
hub := cfcommon.NewHub(&cfg, "metricbeat", log)

// early check that listener can be created
_, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{})
if err != nil {
return nil, err

}

return &Module{
BaseModule: base,
log: log,
hub: hub,
}, nil
}

func (m *Module) RunCounterReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(reporter, m.valueReporter, m.containerReporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(nil, m.valueReporter, m.containerReporter)
m.listenerLock.Unlock()
}

func (m *Module) RunValueReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(m.counterReporter, reporter, m.containerReporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(m.counterReporter, nil, m.containerReporter)
m.listenerLock.Unlock()
}

func (m *Module) RunContainerReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(m.counterReporter, m.valueReporter, reporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(m.counterReporter, m.valueReporter, nil)
m.listenerLock.Unlock()
}

func (m *Module) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) {
if m.listener != nil {
m.listener.Stop()
m.listener = nil
}
m.counterReporter = counterReporter
m.valueReporter = valueReporter
m.containerReporter = containerReporter

start := false
callbacks := cfcommon.RlpListenerCallbacks{}
if m.counterReporter != nil {
start = true
callbacks.Counter = func(evt *cfcommon.EventCounter) {
m.counterReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if m.valueReporter != nil {
start = true
callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) {
m.valueReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if m.containerReporter != nil {
start = true
callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) {
m.containerReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if start {
l, err := m.hub.RlpListener(callbacks)
if err != nil {
m.log.Errorf("failed to create RlpListener: %v", err)
return
}
l.Start(context.Background())
m.listener = l
switch cfg.Version {
case cfcommon.ConsumerVersionV1:
return newModuleV1(base, hub, log)
case cfcommon.ConsumerVersionV2:
return newModuleV2(base, hub, log)
default:
return nil, fmt.Errorf("not supported consumer version: %s", cfg.Version)
}
}
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/cloudfoundry/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet

mod *cloudfoundry.Module
mod cloudfoundry.Module
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
mod, ok := base.Module().(*cloudfoundry.Module)
mod, ok := base.Module().(cloudfoundry.Module)
if !ok {
return nil, fmt.Errorf("must be child of cloudfoundry module")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,26 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest"
)

func TestFetch(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

t.Run("v1", func(t *testing.T) {
testFetch(t, "v1")
})

t.Run("v2", func(t *testing.T) {
testFetch(t, "v2")
})
}

func testFetch(t *testing.T, version string) {
config := mtest.GetConfig(t, "container")
config["version"] = version

ms := mbtest.NewPushMetricSetV2(t, config)
events := mbtest.RunPushMetricSetV2(60*time.Second, 1, ms)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/cloudfoundry/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet

mod *cloudfoundry.Module
mod cloudfoundry.Module
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
mod, ok := base.Module().(*cloudfoundry.Module)
mod, ok := base.Module().(cloudfoundry.Module)
if !ok {
return nil, fmt.Errorf("must be child of cloudfoundry module")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,26 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest"
)

func TestFetch(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

t.Run("v1", func(t *testing.T) {
testFetch(t, "v1")
})

t.Run("v2", func(t *testing.T) {
testFetch(t, "v2")
})
}

func testFetch(t *testing.T, version string) {
config := mtest.GetConfig(t, "counter")
config["version"] = version

ms := mbtest.NewPushMetricSetV2(t, config)
events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)
Expand Down
Loading