Skip to content

Commit

Permalink
Add support for cloudfoundry metrics collection using consumer v1 API (
Browse files Browse the repository at this point in the history
…elastic#19268)

As previously done for the Filebeat input, add support for metrics collection
in the Metricbeat module using the consumer v1 API. It reuses the same
Doppler consumer used in Filebeat.

v2 API is still supported, and can be selected by adding `version: v2` to the
configuration, v1 is used by default as is in principle more reliable.
  • Loading branch information
jsoriano authored Jun 24, 2020
1 parent 1498938 commit 5645491
Show file tree
Hide file tree
Showing 16 changed files with 558 additions and 123 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add memory metrics into compute googlecloud. {pull}18802[18802]
- Add new fields to HAProxy module. {issue}18523[18523]
- Add Tomcat overview dashboard {pull}14026[14026]
- Add support for v1 consumer API in Cloud Foundry module, use it by default. {pull}19268[19268]

*Packetbeat*

Expand Down
9 changes: 8 additions & 1 deletion metricbeat/docs/modules/cloudfoundry.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr
[float]
=== `rlp_address`

The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)".
The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)".

[float]
=== `client_id`
Expand All @@ -103,6 +103,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "".
Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events
from the RLP Gateway. Default: "(generated UUID)".

[float]
==== `version`

Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control.
Use `v2` to collect events from the RLP Gateway. Default: "`v1`".

[float]
=== `ssl`

Expand Down Expand Up @@ -130,6 +136,7 @@ metricbeat.modules:
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1
----

[float]
Expand Down
8 changes: 5 additions & 3 deletions x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,11 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF
}
cb(event)
case err := <-errChan:
// This error is an error on the connection, not a cloud foundry
// error envelope. Firehose should be able to reconnect, so just log it.
c.log.Infof("Error received on firehose: %v", err)
if err != nil {
// This error is an error on the connection, not a cloud foundry
// error envelope. Firehose should be able to reconnect, so just log it.
c.log.Infof("Error received on firehose: %v", err)
}
case <-c.stop:
return
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ metricbeat.modules:
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1

#----------------------------- CockroachDB Module -----------------------------
- module: cockroachdb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1
8 changes: 7 additions & 1 deletion x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr
[float]
=== `rlp_address`

The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)".
The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)".

[float]
=== `client_id`
Expand All @@ -93,6 +93,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "".
Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events
from the RLP Gateway. Default: "(generated UUID)".

[float]
==== `version`

Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control.
Use `v2` to collect events from the RLP Gateway. Default: "`v1`".

[float]
=== `ssl`

Expand Down
126 changes: 14 additions & 112 deletions x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
package cloudfoundry

import (
"context"
"sync"
"fmt"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -16,26 +15,18 @@ import (
// ModuleName is the name of this module.
const ModuleName = "cloudfoundry"

type Module struct {
mb.BaseModule

log *logp.Logger

hub *cfcommon.Hub
listener *cfcommon.RlpListener
listenerLock sync.Mutex

counterReporter mb.PushReporterV2
valueReporter mb.PushReporterV2
containerReporter mb.PushReporterV2
}

func init() {
if err := mb.Registry.AddModule(ModuleName, newModule); err != nil {
panic(err)
}
}

type Module interface {
RunCounterReporter(mb.PushReporterV2)
RunContainerReporter(mb.PushReporterV2)
RunValueReporter(mb.PushReporterV2)
}

func newModule(base mb.BaseModule) (mb.Module, error) {
var cfg cfcommon.Config
if err := base.UnpackConfig(&cfg); err != nil {
Expand All @@ -45,101 +36,12 @@ func newModule(base mb.BaseModule) (mb.Module, error) {
log := logp.NewLogger("cloudfoundry")
hub := cfcommon.NewHub(&cfg, "metricbeat", log)

// early check that listener can be created
_, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{})
if err != nil {
return nil, err

}

return &Module{
BaseModule: base,
log: log,
hub: hub,
}, nil
}

func (m *Module) RunCounterReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(reporter, m.valueReporter, m.containerReporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(nil, m.valueReporter, m.containerReporter)
m.listenerLock.Unlock()
}

func (m *Module) RunValueReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(m.counterReporter, reporter, m.containerReporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(m.counterReporter, nil, m.containerReporter)
m.listenerLock.Unlock()
}

func (m *Module) RunContainerReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(m.counterReporter, m.valueReporter, reporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(m.counterReporter, m.valueReporter, nil)
m.listenerLock.Unlock()
}

func (m *Module) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) {
if m.listener != nil {
m.listener.Stop()
m.listener = nil
}
m.counterReporter = counterReporter
m.valueReporter = valueReporter
m.containerReporter = containerReporter

start := false
callbacks := cfcommon.RlpListenerCallbacks{}
if m.counterReporter != nil {
start = true
callbacks.Counter = func(evt *cfcommon.EventCounter) {
m.counterReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if m.valueReporter != nil {
start = true
callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) {
m.valueReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if m.containerReporter != nil {
start = true
callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) {
m.containerReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if start {
l, err := m.hub.RlpListener(callbacks)
if err != nil {
m.log.Errorf("failed to create RlpListener: %v", err)
return
}
l.Start(context.Background())
m.listener = l
switch cfg.Version {
case cfcommon.ConsumerVersionV1:
return newModuleV1(base, hub, log)
case cfcommon.ConsumerVersionV2:
return newModuleV2(base, hub, log)
default:
return nil, fmt.Errorf("not supported consumer version: %s", cfg.Version)
}
}
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/cloudfoundry/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet

mod *cloudfoundry.Module
mod cloudfoundry.Module
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
mod, ok := base.Module().(*cloudfoundry.Module)
mod, ok := base.Module().(cloudfoundry.Module)
if !ok {
return nil, fmt.Errorf("must be child of cloudfoundry module")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,26 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest"
)

func TestFetch(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

t.Run("v1", func(t *testing.T) {
testFetch(t, "v1")
})

t.Run("v2", func(t *testing.T) {
testFetch(t, "v2")
})
}

func testFetch(t *testing.T, version string) {
config := mtest.GetConfig(t, "container")
config["version"] = version

ms := mbtest.NewPushMetricSetV2(t, config)
events := mbtest.RunPushMetricSetV2(60*time.Second, 1, ms)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/cloudfoundry/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet

mod *cloudfoundry.Module
mod cloudfoundry.Module
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
mod, ok := base.Module().(*cloudfoundry.Module)
mod, ok := base.Module().(cloudfoundry.Module)
if !ok {
return nil, fmt.Errorf("must be child of cloudfoundry module")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,26 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest"
)

func TestFetch(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

t.Run("v1", func(t *testing.T) {
testFetch(t, "v1")
})

t.Run("v2", func(t *testing.T) {
testFetch(t, "v2")
})
}

func testFetch(t *testing.T, version string) {
config := mtest.GetConfig(t, "counter")
config["version"] = version

ms := mbtest.NewPushMetricSetV2(t, config)
events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)
Expand Down
Loading

0 comments on commit 5645491

Please sign in to comment.