Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #19268 to 7.x: Add support for cloudfoundry metrics collection using consumer v1 API #19372

Merged
merged 1 commit into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ field. You can revert this change by configuring tags for the module and omittin
- Add memory metrics into compute googlecloud. {pull}18802[18802]
- Add new fields to HAProxy module. {issue}18523[18523]
- Add Tomcat overview dashboard {pull}14026[14026]
- Add support for v1 consumer API in Cloud Foundry module, use it by default. {pull}19268[19268]

*Packetbeat*

Expand Down
9 changes: 8 additions & 1 deletion metricbeat/docs/modules/cloudfoundry.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr
[float]
=== `rlp_address`

The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)".
The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)".

[float]
=== `client_id`
Expand All @@ -103,6 +103,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "".
Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events
from the RLP Gateway. Default: "(generated UUID)".

[float]
==== `version`

Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control.
Use `v2` to collect events from the RLP Gateway. Default: "`v1`".

[float]
=== `ssl`

Expand Down Expand Up @@ -130,6 +136,7 @@ metricbeat.modules:
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1
----

[float]
Expand Down
8 changes: 5 additions & 3 deletions x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,11 @@ func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeF
}
cb(event)
case err := <-errChan:
// This error is an error on the connection, not a cloud foundry
// error envelope. Firehose should be able to reconnect, so just log it.
c.log.Infof("Error received on firehose: %v", err)
if err != nil {
// This error is an error on the connection, not a cloud foundry
// error envelope. Firehose should be able to reconnect, so just log it.
c.log.Infof("Error received on firehose: %v", err)
}
case <-c.stop:
return
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ metricbeat.modules:
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1

#----------------------------- CockroachDB Module -----------------------------
- module: cockroachdb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
rlp_address: '${CLOUDFOUNDRY_RLP_ADDRESS:""}'
client_id: '${CLOUDFOUNDRY_CLIENT_ID:""}'
client_secret: '${CLOUDFOUNDRY_CLIENT_SECRET:""}'
version: v1
8 changes: 7 additions & 1 deletion x-pack/metricbeat/module/cloudfoundry/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ The URL of the Cloud Foundry UAA API. Optional. Default: "(value from ${api_addr
[float]
=== `rlp_address`

The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(value from ${api_address}/v2/info)".
The URL of the Cloud Foundry RLP Gateway. Optional. Default: "(`log-stream` subdomain under the same domain as `api_server`)".

[float]
=== `client_id`
Expand All @@ -93,6 +93,12 @@ Client Secret to authenticate with Cloud Foundry. Default: "".
Shard ID for connection to the RLP Gateway. Use the same ID across multiple {beatname_lc} to shard the load of events
from the RLP Gateway. Default: "(generated UUID)".

[float]
==== `version`

Consumer API version to connect with Cloud Foundry to collect events. Use `v1` to collect events using Doppler/Traffic Control.
Use `v2` to collect events from the RLP Gateway. Default: "`v1`".

[float]
=== `ssl`

Expand Down
126 changes: 14 additions & 112 deletions x-pack/metricbeat/module/cloudfoundry/cloudfoundry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
package cloudfoundry

import (
"context"
"sync"
"fmt"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
Expand All @@ -16,26 +15,18 @@ import (
// ModuleName is the name of this module.
const ModuleName = "cloudfoundry"

type Module struct {
mb.BaseModule

log *logp.Logger

hub *cfcommon.Hub
listener *cfcommon.RlpListener
listenerLock sync.Mutex

counterReporter mb.PushReporterV2
valueReporter mb.PushReporterV2
containerReporter mb.PushReporterV2
}

func init() {
if err := mb.Registry.AddModule(ModuleName, newModule); err != nil {
panic(err)
}
}

type Module interface {
RunCounterReporter(mb.PushReporterV2)
RunContainerReporter(mb.PushReporterV2)
RunValueReporter(mb.PushReporterV2)
}

func newModule(base mb.BaseModule) (mb.Module, error) {
var cfg cfcommon.Config
if err := base.UnpackConfig(&cfg); err != nil {
Expand All @@ -45,101 +36,12 @@ func newModule(base mb.BaseModule) (mb.Module, error) {
log := logp.NewLogger("cloudfoundry")
hub := cfcommon.NewHub(&cfg, "metricbeat", log)

// early check that listener can be created
_, err := hub.RlpListener(cfcommon.RlpListenerCallbacks{})
if err != nil {
return nil, err

}

return &Module{
BaseModule: base,
log: log,
hub: hub,
}, nil
}

func (m *Module) RunCounterReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(reporter, m.valueReporter, m.containerReporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(nil, m.valueReporter, m.containerReporter)
m.listenerLock.Unlock()
}

func (m *Module) RunValueReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(m.counterReporter, reporter, m.containerReporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(m.counterReporter, nil, m.containerReporter)
m.listenerLock.Unlock()
}

func (m *Module) RunContainerReporter(reporter mb.PushReporterV2) {
m.listenerLock.Lock()
m.runReporters(m.counterReporter, m.valueReporter, reporter)
m.listenerLock.Unlock()

<-reporter.Done()

m.listenerLock.Lock()
m.runReporters(m.counterReporter, m.valueReporter, nil)
m.listenerLock.Unlock()
}

func (m *Module) runReporters(counterReporter, valueReporter, containerReporter mb.PushReporterV2) {
if m.listener != nil {
m.listener.Stop()
m.listener = nil
}
m.counterReporter = counterReporter
m.valueReporter = valueReporter
m.containerReporter = containerReporter

start := false
callbacks := cfcommon.RlpListenerCallbacks{}
if m.counterReporter != nil {
start = true
callbacks.Counter = func(evt *cfcommon.EventCounter) {
m.counterReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if m.valueReporter != nil {
start = true
callbacks.ValueMetric = func(evt *cfcommon.EventValueMetric) {
m.valueReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if m.containerReporter != nil {
start = true
callbacks.ContainerMetric = func(evt *cfcommon.EventContainerMetric) {
m.containerReporter.Event(mb.Event{
Timestamp: evt.Timestamp(),
RootFields: evt.ToFields(),
})
}
}
if start {
l, err := m.hub.RlpListener(callbacks)
if err != nil {
m.log.Errorf("failed to create RlpListener: %v", err)
return
}
l.Start(context.Background())
m.listener = l
switch cfg.Version {
case cfcommon.ConsumerVersionV1:
return newModuleV1(base, hub, log)
case cfcommon.ConsumerVersionV2:
return newModuleV2(base, hub, log)
default:
return nil, fmt.Errorf("not supported consumer version: %s", cfg.Version)
}
}
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/cloudfoundry/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet

mod *cloudfoundry.Module
mod cloudfoundry.Module
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
mod, ok := base.Module().(*cloudfoundry.Module)
mod, ok := base.Module().(cloudfoundry.Module)
if !ok {
return nil, fmt.Errorf("must be child of cloudfoundry module")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,26 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest"
)

func TestFetch(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

t.Run("v1", func(t *testing.T) {
testFetch(t, "v1")
})

t.Run("v2", func(t *testing.T) {
testFetch(t, "v2")
})
}

func testFetch(t *testing.T, version string) {
config := mtest.GetConfig(t, "container")
config["version"] = version

ms := mbtest.NewPushMetricSetV2(t, config)
events := mbtest.RunPushMetricSetV2(60*time.Second, 1, ms)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/metricbeat/module/cloudfoundry/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet

mod *cloudfoundry.Module
mod cloudfoundry.Module
}

// New create a new instance of the MetricSet
// Part of new is also setting up the configuration by processing additional
// configuration entries if needed.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
mod, ok := base.Module().(*cloudfoundry.Module)
mod, ok := base.Module().(cloudfoundry.Module)
if !ok {
return nil, fmt.Errorf("must be child of cloudfoundry module")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,26 @@ import (

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/logp"
mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/cloudfoundry/mtest"
)

func TestFetch(t *testing.T) {
logp.TestingSetup(logp.WithSelectors("cloudfoundry"))

t.Run("v1", func(t *testing.T) {
testFetch(t, "v1")
})

t.Run("v2", func(t *testing.T) {
testFetch(t, "v2")
})
}

func testFetch(t *testing.T, version string) {
config := mtest.GetConfig(t, "counter")
config["version"] = version

ms := mbtest.NewPushMetricSetV2(t, config)
events := mbtest.RunPushMetricSetV2(10*time.Second, 1, ms)
Expand Down
Loading