Skip to content

Commit

Permalink
Metricbeat logstash module: accept override cluster UUID from Logst…
Browse files Browse the repository at this point in the history
…ash (#15795)

* Fetch override cluster UUID from Logstash node pipelines API and use it

* Parse override cluster UUID from Logstash API response and make node_stats use it

* Adding CHANGELOG entry

* Use override cluster UUID as fallback to ES vertex cluster UUID

* Fixing CHANGELOG entry language

* Adding godoc for new helper function
  • Loading branch information
ycombinator authored Feb 5, 2020
1 parent 49b0eb9 commit 2a6d88f
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support for processors in light modules. {issue}14740[14740] {pull}15923[15923]
- Add collecting AuroraDB metrics in rds metricset. {issue}14142[14142] {pull}16004[16004]
- Reuse connections in SQL module. {pull}16001[16001]
- Improve the `logstash` module (when `xpack.enabled` is set to `true`) to use the override `cluster_uuid` returned by Logstash APIs. {issue}15772[15772] {pull}15795[15795]

*Packetbeat*

Expand Down
35 changes: 30 additions & 5 deletions metricbeat/module/logstash/logstash.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,24 @@ func NewMetricSet(base mb.BaseMetricSet) (*MetricSet, error) {
}, nil
}

// GetPipelines returns the list of pipelines running on a Logstash node
func GetPipelines(m *MetricSet) ([]PipelineState, error) {
// GetPipelines returns the list of pipelines running on a Logstash node and,
// optionally, an override cluster UUID.
func GetPipelines(m *MetricSet) ([]PipelineState, string, error) {
content, err := fetchPath(m.HTTP, "_node/pipelines", "graph=true")
if err != nil {
return nil, errors.Wrap(err, "could not fetch node pipelines")
return nil, "", errors.Wrap(err, "could not fetch node pipelines")
}

pipelinesResponse := struct {
Monitoring struct {
ClusterID string `json:"cluster_uuid"`
} `json:"monitoring"`
Pipelines map[string]PipelineState `json:"pipelines"`
}{}

err = json.Unmarshal(content, &pipelinesResponse)
if err != nil {
return nil, errors.Wrap(err, "could not parse node pipelines response")
return nil, "", errors.Wrap(err, "could not parse node pipelines response")
}

var pipelines []PipelineState
Expand All @@ -156,7 +160,7 @@ func GetPipelines(m *MetricSet) ([]PipelineState, error) {
pipelines = append(pipelines, pipeline)
}

return pipelines, nil
return pipelines, pipelinesResponse.Monitoring.ClusterID, nil
}

// CheckPipelineGraphAPIsAvailable returns an error if pipeline graph APIs are not
Expand All @@ -177,6 +181,27 @@ func (m *MetricSet) CheckPipelineGraphAPIsAvailable() error {
return nil
}

// GetVertexClusterUUID returns the correct cluster UUID value for the given Elasticsearch
// vertex from a Logstash pipeline. If the vertex has no cluster UUID associated with it,
// the given override cluster UUID is returned.
func GetVertexClusterUUID(vertex map[string]interface{}, overrideClusterUUID string) string {
c, ok := vertex["cluster_uuid"]
if !ok {
return overrideClusterUUID
}

clusterUUID, ok := c.(string)
if !ok {
return overrideClusterUUID
}

if clusterUUID == "" {
return overrideClusterUUID
}

return clusterUUID
}

func (m *MetricSet) getVersion() (*common.Version, error) {
const rootPath = "/"
content, err := fetchPath(m.HTTP, rootPath, "")
Expand Down
59 changes: 59 additions & 0 deletions metricbeat/module/logstash/logstash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package logstash

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGetVertexClusterUUID(t *testing.T) {
tests := map[string]struct {
vertex map[string]interface{}
overrideClusterUUID string
expectedClusterUUID string
}{
"vertex_and_override": {
map[string]interface{}{
"cluster_uuid": "v",
},
"o",
"v",
},
"vertex_only": {
vertex: map[string]interface{}{
"cluster_uuid": "v",
},
expectedClusterUUID: "v",
},
"override_only": {
overrideClusterUUID: "o",
expectedClusterUUID: "o",
},
"none": {
expectedClusterUUID: "",
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, test.expectedClusterUUID, GetVertexClusterUUID(test.vertex, test.overrideClusterUUID))
})
}
}
19 changes: 6 additions & 13 deletions metricbeat/module/logstash/node/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"github.com/elastic/beats/metricbeat/module/logstash"
)

func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.PipelineState) error {
func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.PipelineState, overrideClusterUUID string) error {
pipelines = getUserDefinedPipelines(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines, overrideClusterUUID)
for clusterUUID, pipelines := range clusterToPipelinesMap {
for _, pipeline := range pipelines {
removeClusterUUIDsFromPipeline(pipeline)
Expand Down Expand Up @@ -62,24 +62,17 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, pipelines []logstash.Pipel
return nil
}

func makeClusterToPipelinesMap(pipelines []logstash.PipelineState) map[string][]logstash.PipelineState {
func makeClusterToPipelinesMap(pipelines []logstash.PipelineState, overrideClusterUUID string) map[string][]logstash.PipelineState {
var clusterToPipelinesMap map[string][]logstash.PipelineState
clusterToPipelinesMap = make(map[string][]logstash.PipelineState)

for _, pipeline := range pipelines {
var clusterUUIDs []string
for _, vertex := range pipeline.Graph.Graph.Vertices {
c, ok := vertex["cluster_uuid"]
if !ok {
continue
}

clusterUUID, ok := c.(string)
if !ok {
continue
clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID)
if clusterUUID != "" {
clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

// If no cluster UUID was found in this pipeline, assign it a blank one
Expand Down
4 changes: 2 additions & 2 deletions metricbeat/module/logstash/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error {
return eventMapping(r, content)
}

pipelinesContent, err := logstash.GetPipelines(m.MetricSet)
pipelinesContent, overrideClusterUUID, err := logstash.GetPipelines(m.MetricSet)
if err != nil {
m.Logger().Error(err)
return nil
}

err = eventMappingXPack(r, m, pipelinesContent)
err = eventMappingXPack(r, m, pipelinesContent, overrideClusterUUID)
if err != nil {
m.Logger().Error(err)
}
Expand Down
27 changes: 15 additions & 12 deletions metricbeat/module/logstash/node_stats/data_xpack.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"encoding/json"
"time"

"github.com/elastic/beats/metricbeat/module/logstash"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
Expand Down Expand Up @@ -94,6 +96,9 @@ type nodeInfo struct {
Status string `json:"status"`
HTTPAddress string `json:"http_address"`
Pipeline pipeline `json:"pipeline"`
Monitoring struct {
ClusterID string `json:"cluster_uuid"`
} `json:"monitoring"`
}

type reloads struct {
Expand Down Expand Up @@ -166,7 +171,7 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
}

pipelines = getUserDefinedPipelines(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines)
clusterToPipelinesMap := makeClusterToPipelinesMap(pipelines, nodeStats.Monitoring.ClusterID)

for clusterUUID, clusterPipelines := range clusterToPipelinesMap {
logstashStats := LogstashStats{
Expand Down Expand Up @@ -197,24 +202,22 @@ func eventMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) error {
return nil
}

func makeClusterToPipelinesMap(pipelines []PipelineStats) map[string][]PipelineStats {
func makeClusterToPipelinesMap(pipelines []PipelineStats, overrideClusterUUID string) map[string][]PipelineStats {
var clusterToPipelinesMap map[string][]PipelineStats
clusterToPipelinesMap = make(map[string][]PipelineStats)

if overrideClusterUUID != "" {
clusterToPipelinesMap[overrideClusterUUID] = pipelines
return clusterToPipelinesMap
}

for _, pipeline := range pipelines {
var clusterUUIDs []string
for _, vertex := range pipeline.Vertices {
c, ok := vertex["cluster_uuid"]
if !ok {
continue
clusterUUID := logstash.GetVertexClusterUUID(vertex, overrideClusterUUID)
if clusterUUID != "" {
clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

clusterUUID, ok := c.(string)
if !ok {
continue
}

clusterUUIDs = append(clusterUUIDs, clusterUUID)
}

// If no cluster UUID was found in this pipeline, assign it a blank one
Expand Down

0 comments on commit 2a6d88f

Please sign in to comment.