Skip to content

Commit

Permalink
Be defensive about determining ES cluster UUID (elastic#13020)
Browse files Browse the repository at this point in the history
* Be defensive about determining ES cluster UUID

If the beat being monitored is using the Elasticsearch output, Metricbeat must be able to retrieve that Elasticsearch cluster's UUID. If it cannot, either there's a problem or the monitored beat hasn't established a connection to the Elasticsearch cluster configured in the output yet. Either way, Metricbeat should not send monitoring data about the monitored beat or else that data will not have the cluster_uuid field and therefore not get associated with the correct Elasticsearch cluster in the Stack Monitoring UI.

* Make beat/stats metricset also check for cluster UUID when monitored beat output == ES

* Also spin up Elasticsearch container

* Allow monitored Metricbeat enough time to determine it's ES cluster's UUID

* Increasing sleep time

* Increasing sleep time again
  • Loading branch information
ycombinator authored Jul 30, 2019
1 parent 2c85c97 commit d1cd81a
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 5 deletions.
11 changes: 11 additions & 0 deletions metricbeat/module/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beat

import (
"encoding/json"
"fmt"
"net/url"

"github.com/pkg/errors"
Expand Down Expand Up @@ -74,6 +75,13 @@ func validateXPackMetricsets(base mb.BaseModule) error {
// ModuleName is the name of this module.
const ModuleName = "beat"

var (
// ErrClusterUUID is the error to be returned when the monitored beat is using the Elasticsearch output but hasn't
// yet connected or is having trouble connecting to that Elasticsearch, so the cluster UUID cannot be
// determined
ErrClusterUUID = fmt.Errorf("monitored beat is using Elasticsearch output but cluster UUID cannot be determined")
)

// Info construct contains the relevant data from the Beat's / endpoint
type Info struct {
UUID string `json:"uuid"`
Expand All @@ -85,6 +93,9 @@ type Info struct {

// State construct contains the relevant data from the Beat's /state endpoint
type State struct {
Output struct {
Name string `json:"name"`
} `json:"output"`
Outputs struct {
Elasticsearch struct {
ClusterUUID string `json:"cluster_uuid"`
Expand Down
39 changes: 36 additions & 3 deletions metricbeat/module/beat/state/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/module/beat"
b "github.com/elastic/beats/metricbeat/module/beat"
)

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info beat.Info, content []byte) error {
func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info b.Info, content []byte) error {
now := time.Now()

// Massage info into beat
Expand All @@ -54,7 +54,16 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, info beat.Info, content []
"timestamp": now,
}

clusterUUID := getClusterUUID(state)
var clusterUUID string
if isOutputES(state) {
clusterUUID = getClusterUUID(state)
if clusterUUID == "" {
// Output is ES but cluster UUID could not be determined. No point sending monitoring
// data with empty cluster UUID since it will not be associated with the correct ES
// production cluster. Log error instead.
return errors.Wrap(b.ErrClusterUUID, "could not determine cluster UUID")
}
}

var event mb.Event
event.RootFields = common.MapStr{
Expand Down Expand Up @@ -108,3 +117,27 @@ func getClusterUUID(state map[string]interface{}) string {

return clusterUUID
}

func isOutputES(state map[string]interface{}) bool {
o, exists := state["output"]
if !exists {
return false
}

output, ok := o.(map[string]interface{})
if !ok {
return false
}

n, exists := output["name"]
if !exists {
return false
}

name, ok := n.(string)
if !ok {
return false
}

return name == "elasticsearch"
}
15 changes: 14 additions & 1 deletion metricbeat/module/beat/stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,18 @@ func (m *MetricSet) getClusterUUID() (string, error) {
return "", errors.Wrap(err, "could not get state information")
}

return state.Outputs.Elasticsearch.ClusterUUID, nil
if state.Output.Name != "elasticsearch" {
return "", nil
}

clusterUUID := state.Outputs.Elasticsearch.ClusterUUID
if clusterUUID == "" {
// Output is ES but cluster UUID could not be determined. No point sending monitoring
// data with empty cluster UUID since it will not be associated with the correct ES
// production cluster. Log error instead.
return "", beat.ErrClusterUUID
}

return clusterUUID, nil

}
9 changes: 8 additions & 1 deletion metricbeat/tests/system/test_beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class Test(metricbeat.BaseTest):

COMPOSE_SERVICES = ['metricbeat']
COMPOSE_SERVICES = ['metricbeat', 'elasticsearch']
FIELDS = ['beat']
METRICSETS = ['stats', 'state']

Expand All @@ -34,6 +34,13 @@ def test_xpack(self):
}
}])

# Give the monitored Metricbeat instance enough time to collect metrics and index them
# into Elasticsearch, so it may establish the connection to Elasticsearch and determine
# it's cluster UUID in the process. Otherwise, the monitoring Metricbeat instance will
# show errors in its log about not being able to determine the Elasticsearch cluster UUID
# to be associated with the monitored Metricbeat instance.
time.sleep(30)

proc = self.start_beat()
self.wait_until(lambda: self.output_lines() > 0)
proc.check_kill_and_wait()
Expand Down

0 comments on commit d1cd81a

Please sign in to comment.