Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat] Use timestamp from CloudWatch for events #21498

Merged
merged 4 commits into from
Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix remote_write flaky test. {pull}21173[21173]
- Visualization title fixes in aws, azure and googlecloud compute dashboards. {pull}21098[21098]
- Add a switch to the driver definition on SQL module to use pretty names {pull}17378[17378]
- Use timestamp from CloudWatch API when creating events. {pull}21498[21498]

*Packetbeat*

Expand Down
13 changes: 8 additions & 5 deletions x-pack/metricbeat/module/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,14 @@ func StringInSlice(str string, list []string) (bool, int) {
}

// InitEvent initialize mb.Event with basic information like service.name, cloud.provider
func InitEvent(regionName string, accountName string, accountID string) mb.Event {
event := mb.Event{}
event.MetricSetFields = common.MapStr{}
event.ModuleFields = common.MapStr{}
event.RootFields = common.MapStr{}
func InitEvent(regionName string, accountName string, accountID string, timestamp time.Time) mb.Event {
event := mb.Event{
Timestamp: timestamp,
MetricSetFields: common.MapStr{},
ModuleFields: common.MapStr{},
RootFields: common.MapStr{},
}

event.RootFields.Put("cloud.provider", "aws")
if regionName != "" {
event.RootFields.Put("cloud.region", regionName)
Expand Down
38 changes: 20 additions & 18 deletions x-pack/metricbeat/module/aws/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,28 @@ func (m *MetricSet) getCloudWatchBillingMetrics(

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(metricDataOutput)
if !timestamp.IsZero() {
for _, output := range metricDataOutput {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if timestamp.IsZero() {
return nil
}

for _, output := range metricDataOutput {
if len(output.Values) == 0 {
continue
}
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)

event := aws.InitEvent("", m.AccountName, m.AccountID)
event.MetricSetFields.Put(labels[0], output.Values[timestampIdx])
event := aws.InitEvent("", m.AccountName, m.AccountID, timestamp)
event.MetricSetFields.Put(labels[0], output.Values[timestampIdx])

i := 1
for i < len(labels)-1 {
event.MetricSetFields.Put(labels[i], labels[i+1])
i += 2
}
event.Timestamp = endTime
events = append(events, event)
i := 1
for i < len(labels)-1 {
event.MetricSetFields.Put(labels[i], labels[i+1])
i += 2
}
event.Timestamp = endTime
events = append(events, event)
}
}
return events
Expand Down Expand Up @@ -278,7 +280,7 @@ func (m *MetricSet) getCostGroupBy(svcCostExplorer costexploreriface.ClientAPI,
}

func (m *MetricSet) addCostMetrics(metrics map[string]costexplorer.MetricValue, groupDefinition costexplorer.GroupDefinition, startDate string, endDate string) mb.Event {
event := aws.InitEvent("", m.AccountName, m.AccountID)
event := aws.InitEvent("", m.AccountName, m.AccountID, time.Now())

// add group definition
event.MetricSetFields.Put("group_definition", common.MapStr{
Expand Down
105 changes: 52 additions & 53 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,34 +499,35 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes

// Find a timestamp for all metrics in output
timestamp := aws.FindTimestamp(metricDataResults)
if timestamp.IsZero() {
return nil, nil
}

// Create events when there is no tags_filter or resource_type specified.
if len(resourceTypeTagFilters) == 0 {
if !timestamp.IsZero() {
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
continue
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
continue
}

identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)
identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)
}
}
return events, nil
Expand Down Expand Up @@ -556,45 +557,43 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes
m.logger.Debugf("In region %s, service %s tags match tags_filter", regionName, identifier)
}

if !timestamp.IsZero() {
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}
for _, output := range metricDataResults {
if len(output.Values) == 0 {
continue
}

exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
if len(tagsFilter) != 0 {
continue
}

// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
}
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps)
if exists {
labels := strings.Split(*output.Label, labelSeparator)
if len(labels) != 5 {
// if there is no tag in labels but there is a tagsFilter, then no event should be reported.
if len(tagsFilter) != 0 {
continue
}

identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
// when tagsFilter is not empty but no entry in
// resourceTagMap for this identifier, do not initialize
// an event for this identifier.
if len(tagsFilter) != 0 && resourceTagMap[identifierValue] == nil {
continue
}
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID)
// when there is no identifier value in label, use region+accountID+namespace instead
identifier := regionName + m.AccountID + labels[namespaceIdx]
if _, ok := events[identifier]; !ok {
events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)
events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels)
continue
}

// add tags to event based on identifierValue
insertTags(events, identifierValue, resourceTagMap)
identifierValue := labels[identifierValueIdx]
if _, ok := events[identifierValue]; !ok {
// when tagsFilter is not empty but no entry in
// resourceTagMap for this identifier, do not initialize
// an event for this identifier.
if len(tagsFilter) != 0 && resourceTagMap[identifierValue] == nil {
continue
}
events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp)
}
events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels)

// add tags to event based on identifierValue
insertTags(events, identifierValue, resourceTagMap)
}
}
}
Expand Down
39 changes: 32 additions & 7 deletions x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import (
"testing"
"time"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface"
Expand All @@ -22,12 +19,14 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/metricbeat/mb"
"github.com/elastic/beats/v7/x-pack/metricbeat/module/aws"
)

var (
regionName = "us-west-1"
timestamp = time.Now()
timestamp = time.Date(2020, 10, 06, 00, 00, 00, 0, time.UTC)
accountID = "123456789012"
accountName = "test"

Expand Down Expand Up @@ -1466,9 +1465,9 @@ func TestInsertTags(t *testing.T) {
tagValue3 := "dev"

events := map[string]mb.Event{}
events[identifier1] = aws.InitEvent(regionName, accountName, accountID)
events[identifier2] = aws.InitEvent(regionName, accountName, accountID)
events[identifierContainsArn] = aws.InitEvent(regionName, accountName, accountID)
events[identifier1] = aws.InitEvent(regionName, accountName, accountID, timestamp)
events[identifier2] = aws.InitEvent(regionName, accountName, accountID, timestamp)
events[identifierContainsArn] = aws.InitEvent(regionName, accountName, accountID, timestamp)

resourceTagMap := map[string][]resourcegroupstaggingapi.Tag{}
resourceTagMap["test-s3-1"] = []resourcegroupstaggingapi.Tag{
Expand Down Expand Up @@ -1569,3 +1568,29 @@ func TestConfigDimensionValueContainsWildcard(t *testing.T) {
})
}
}

func TestCreateEventsTimestamp(t *testing.T) {
m := MetricSet{
logger: logp.NewLogger("test"),
CloudwatchConfigs: []Config{{Statistic: []string{"Average"}}},
MetricSet: &aws.MetricSet{Period: 5, AccountID: accountID},
}

listMetricWithStatsTotal := []metricsWithStatistics{
{
cloudwatch.Metric{
MetricName: awssdk.String("CPUUtilization"),
Namespace: awssdk.String("AWS/EC2"),
},
[]string{"Average"},
nil,
},
}

resourceTypeTagFilters := map[string][]aws.Tag{}
startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency)

events, err := m.createEvents(&MockCloudWatchClientWithoutDim{}, &MockResourceGroupsTaggingClient{}, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime)
assert.NoError(t, err)
assert.Equal(t, timestamp, events[regionName+accountID+namespace].Timestamp)
}
Loading