From 4758dabfb3416bc0b0df336263f306982e980fc1 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 2 Oct 2020 14:45:35 -0600 Subject: [PATCH 1/3] Use timestamp from CloudWatch for events --- x-pack/metricbeat/module/aws/_meta/config.yml | 2 +- x-pack/metricbeat/module/aws/aws.go | 4 +- .../metricbeat/module/aws/billing/billing.go | 38 ++-- .../module/aws/cloudwatch/cloudwatch.go | 105 ++++++----- .../module/aws/cloudwatch/cloudwatch_test.go | 6 +- x-pack/metricbeat/module/aws/ec2/ec2.go | 164 +++++++++--------- x-pack/metricbeat/module/aws/rds/rds.go | 106 +++++------ .../aws/s3_daily_storage/s3_daily_storage.go | 3 +- .../module/aws/s3_request/s3_request.go | 3 +- x-pack/metricbeat/module/aws/sqs/sqs.go | 7 +- x-pack/metricbeat/modules.d/aws.yml.disabled | 2 +- 11 files changed, 222 insertions(+), 218 deletions(-) diff --git a/x-pack/metricbeat/module/aws/_meta/config.yml b/x-pack/metricbeat/module/aws/_meta/config.yml index 618ed4cd854..f167b464ab3 100644 --- a/x-pack/metricbeat/module/aws/_meta/config.yml +++ b/x-pack/metricbeat/module/aws/_meta/config.yml @@ -4,6 +4,7 @@ - elb - natgateway - rds + - s3_request - transitgateway - usage - vpn @@ -44,4 +45,3 @@ period: 24h metricsets: - s3_daily_storage - - s3_request diff --git a/x-pack/metricbeat/module/aws/aws.go b/x-pack/metricbeat/module/aws/aws.go index f7b744c27cb..57664263fd3 100644 --- a/x-pack/metricbeat/module/aws/aws.go +++ b/x-pack/metricbeat/module/aws/aws.go @@ -193,7 +193,7 @@ func StringInSlice(str string, list []string) (bool, int) { } // InitEvent initialize mb.Event with basic information like service.name, cloud.provider -func InitEvent(regionName string, accountName string, accountID string) mb.Event { +func InitEvent(regionName string, accountName string, accountID string, timestamp time.Time) mb.Event { event := mb.Event{} event.MetricSetFields = common.MapStr{} event.ModuleFields = common.MapStr{} @@ -208,6 +208,8 @@ func InitEvent(regionName string, accountName string, accountID string) mb.Event if accountID != "" { event.RootFields.Put("cloud.account.id", accountID) } + + event.Timestamp = timestamp return event } diff --git a/x-pack/metricbeat/module/aws/billing/billing.go b/x-pack/metricbeat/module/aws/billing/billing.go index 2eb2bd2854a..2ddbfff4fa6 100644 --- a/x-pack/metricbeat/module/aws/billing/billing.go +++ b/x-pack/metricbeat/module/aws/billing/billing.go @@ -178,26 +178,28 @@ func (m *MetricSet) getCloudWatchBillingMetrics( // Find a timestamp for all metrics in output timestamp := aws.FindTimestamp(metricDataOutput) - if !timestamp.IsZero() { - for _, output := range metricDataOutput { - if len(output.Values) == 0 { - continue - } - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - labels := strings.Split(*output.Label, labelSeparator) + if timestamp.IsZero() { + return nil + } + + for _, output := range metricDataOutput { + if len(output.Values) == 0 { + continue + } + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, labelSeparator) - event := aws.InitEvent("", m.AccountName, m.AccountID) - event.MetricSetFields.Put(labels[0], output.Values[timestampIdx]) + event := aws.InitEvent("", m.AccountName, m.AccountID, timestamp) + event.MetricSetFields.Put(labels[0], output.Values[timestampIdx]) - i := 1 - for i < len(labels)-1 { - event.MetricSetFields.Put(labels[i], labels[i+1]) - i += 2 - } - event.Timestamp = endTime - events = append(events, event) + i := 1 + for i < len(labels)-1 { + event.MetricSetFields.Put(labels[i], labels[i+1]) + i += 2 } + event.Timestamp = endTime + events = append(events, event) } } return events @@ -278,7 +280,7 @@ func (m *MetricSet) getCostGroupBy(svcCostExplorer costexploreriface.ClientAPI, } func (m *MetricSet) addCostMetrics(metrics map[string]costexplorer.MetricValue, groupDefinition costexplorer.GroupDefinition, startDate string, endDate string) mb.Event { - event := aws.InitEvent("", m.AccountName, m.AccountID) + event := aws.InitEvent("", m.AccountName, m.AccountID, time.Now()) // add group definition event.MetricSetFields.Put("group_definition", common.MapStr{ diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go index 42d68acb3af..16f85dc5e89 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go @@ -498,34 +498,35 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes // Find a timestamp for all metrics in output timestamp := aws.FindTimestamp(metricDataResults) + if timestamp.IsZero() { + return nil, nil + } // Create events when there is no tags_filter or resource_type specified. if len(resourceTypeTagFilters) == 0 { - if !timestamp.IsZero() { - for _, output := range metricDataResults { - if len(output.Values) == 0 { - continue - } + for _, output := range metricDataResults { + if len(output.Values) == 0 { + continue + } - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - labels := strings.Split(*output.Label, labelSeparator) - if len(labels) != 5 { - // when there is no identifier value in label, use region+accountID+namespace instead - identifier := regionName + m.AccountID + labels[namespaceIdx] - if _, ok := events[identifier]; !ok { - events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID) - } - events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels) - continue + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, labelSeparator) + if len(labels) != 5 { + // when there is no identifier value in label, use region+accountID+namespace instead + identifier := regionName + m.AccountID + labels[namespaceIdx] + if _, ok := events[identifier]; !ok { + events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp) } + events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels) + continue + } - identifierValue := labels[identifierValueIdx] - if _, ok := events[identifierValue]; !ok { - events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID) - } - events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels) + identifierValue := labels[identifierValueIdx] + if _, ok := events[identifierValue]; !ok { + events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp) } + events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels) } } return events, nil @@ -555,45 +556,43 @@ func (m *MetricSet) createEvents(svcCloudwatch cloudwatchiface.ClientAPI, svcRes m.logger.Debugf("In region %s, service %s tags match tags_filter", regionName, identifier) } - if !timestamp.IsZero() { - for _, output := range metricDataResults { - if len(output.Values) == 0 { - continue - } + for _, output := range metricDataResults { + if len(output.Values) == 0 { + continue + } - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - labels := strings.Split(*output.Label, labelSeparator) - if len(labels) != 5 { - // if there is no tag in labels but there is a tagsFilter, then no event should be reported. - if len(tagsFilter) != 0 { - continue - } - - // when there is no identifier value in label, use region+accountID+namespace instead - identifier := regionName + m.AccountID + labels[namespaceIdx] - if _, ok := events[identifier]; !ok { - events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID) - } - events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels) + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, labelSeparator) + if len(labels) != 5 { + // if there is no tag in labels but there is a tagsFilter, then no event should be reported. + if len(tagsFilter) != 0 { continue } - identifierValue := labels[identifierValueIdx] - if _, ok := events[identifierValue]; !ok { - // when tagsFilter is not empty but no entry in - // resourceTagMap for this identifier, do not initialize - // an event for this identifier. - if len(tagsFilter) != 0 && resourceTagMap[identifierValue] == nil { - continue - } - events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID) + // when there is no identifier value in label, use region+accountID+namespace instead + identifier := regionName + m.AccountID + labels[namespaceIdx] + if _, ok := events[identifier]; !ok { + events[identifier] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp) } - events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels) + events[identifier] = insertRootFields(events[identifier], output.Values[timestampIdx], labels) + continue + } - // add tags to event based on identifierValue - insertTags(events, identifierValue, resourceTagMap) + identifierValue := labels[identifierValueIdx] + if _, ok := events[identifierValue]; !ok { + // when tagsFilter is not empty but no entry in + // resourceTagMap for this identifier, do not initialize + // an event for this identifier. + if len(tagsFilter) != 0 && resourceTagMap[identifierValue] == nil { + continue + } + events[identifierValue] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp) } + events[identifierValue] = insertRootFields(events[identifierValue], output.Values[timestampIdx], labels) + + // add tags to event based on identifierValue + insertTags(events, identifierValue, resourceTagMap) } } } diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go index 353ffd0e236..9c7b7ddaccc 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go @@ -1466,9 +1466,9 @@ func TestInsertTags(t *testing.T) { tagValue3 := "dev" events := map[string]mb.Event{} - events[identifier1] = aws.InitEvent(regionName, accountName, accountID) - events[identifier2] = aws.InitEvent(regionName, accountName, accountID) - events[identifierContainsArn] = aws.InitEvent(regionName, accountName, accountID) + events[identifier1] = aws.InitEvent(regionName, accountName, accountID, timestamp) + events[identifier2] = aws.InitEvent(regionName, accountName, accountID, timestamp) + events[identifierContainsArn] = aws.InitEvent(regionName, accountName, accountID, timestamp) resourceTagMap := map[string][]resourcegroupstaggingapi.Tag{} resourceTagMap["test-s3-1"] = []resourcegroupstaggingapi.Tag{ diff --git a/x-pack/metricbeat/module/aws/ec2/ec2.go b/x-pack/metricbeat/module/aws/ec2/ec2.go index 36ad9a1ca02..bcbdee90401 100644 --- a/x-pack/metricbeat/module/aws/ec2/ec2.go +++ b/x-pack/metricbeat/module/aws/ec2/ec2.go @@ -174,115 +174,117 @@ func constructMetricQueries(listMetricsOutput []cloudwatch.Metric, instanceID st } func (m *MetricSet) createCloudWatchEvents(getMetricDataResults []cloudwatch.MetricDataResult, instanceOutput map[string]ec2.Instance, regionName string) (map[string]mb.Event, error) { + // monitoring state for each instance + monitoringStates := map[string]string{} + + // Find a timestamp for all metrics in output + timestamp := aws.FindTimestamp(getMetricDataResults) + if timestamp.IsZero() { + return nil, nil + } + // Initialize events and metricSetFieldResults per instanceID events := map[string]mb.Event{} metricSetFieldResults := map[idStat]map[string]interface{}{} for instanceID := range instanceOutput { for _, statistic := range statistics { - events[instanceID] = aws.InitEvent(regionName, m.AccountName, m.AccountID) + events[instanceID] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp) metricSetFieldResults[idStat{instanceID: instanceID, statistic: statistic}] = map[string]interface{}{} } } - // monitoring state for each instance - monitoringStates := map[string]string{} + for _, output := range getMetricDataResults { + if len(output.Values) == 0 { + continue + } - // Find a timestamp for all metrics in output - timestamp := aws.FindTimestamp(getMetricDataResults) - if !timestamp.IsZero() { - for _, output := range getMetricDataResults { - if len(output.Values) == 0 { + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + label, err := newLabelFromJSON(*output.Label) + if err != nil { + m.logger.Errorf("convert cloudwatch MetricDataResult label failed for label = %s: %w", *output.Label, err) continue } - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - label, err := newLabelFromJSON(*output.Label) - if err != nil { - m.logger.Errorf("convert cloudwatch MetricDataResult label failed for label = %s: %w", *output.Label, err) + instanceID := label.InstanceID + statistic := label.Statistic + + // Add tags + tags := instanceOutput[instanceID].Tags + if m.TagsFilter != nil { + // Check with each tag filter + // If tag filter doesn't exist in tagKeys/tagValues, + // then do not report this event/instance. + if exists := aws.CheckTagFiltersExist(m.TagsFilter, tags); !exists { + // if tag filter doesn't exist, remove this event initial + // entry to avoid report an empty event. + delete(events, instanceID) continue } + } - instanceID := label.InstanceID - statistic := label.Statistic - - // Add tags - tags := instanceOutput[instanceID].Tags - if m.TagsFilter != nil { - // Check with each tag filter - // If tag filter doesn't exist in tagKeys/tagValues, - // then do not report this event/instance. - if exists := aws.CheckTagFiltersExist(m.TagsFilter, tags); !exists { - // if tag filter doesn't exist, remove this event initial - // entry to avoid report an empty event. - delete(events, instanceID) - continue - } - } - - // By default, replace dot "." using underscore "_" for tag keys. - // Note: tag values are not dedotted. - for _, tag := range tags { - events[instanceID].ModuleFields.Put("tags."+common.DeDot(*tag.Key), *tag.Value) - // add cloud.instance.name and host.name into ec2 events - if *tag.Key == "Name" { - events[instanceID].RootFields.Put("cloud.instance.name", *tag.Value) - events[instanceID].RootFields.Put("host.name", *tag.Value) - } - } - - machineType, err := instanceOutput[instanceID].InstanceType.MarshalValue() - if err != nil { - return events, errors.Wrap(err, "instance.InstanceType.MarshalValue failed") + // By default, replace dot "." using underscore "_" for tag keys. + // Note: tag values are not dedotted. + for _, tag := range tags { + events[instanceID].ModuleFields.Put("tags."+common.DeDot(*tag.Key), *tag.Value) + // add cloud.instance.name and host.name into ec2 events + if *tag.Key == "Name" { + events[instanceID].RootFields.Put("cloud.instance.name", *tag.Value) + events[instanceID].RootFields.Put("host.name", *tag.Value) } + } - events[instanceID].RootFields.Put("cloud.instance.id", instanceID) - events[instanceID].RootFields.Put("cloud.machine.type", machineType) + machineType, err := instanceOutput[instanceID].InstanceType.MarshalValue() + if err != nil { + return events, errors.Wrap(err, "instance.InstanceType.MarshalValue failed") + } - placement := instanceOutput[instanceID].Placement - if placement != nil { - events[instanceID].RootFields.Put("cloud.availability_zone", *placement.AvailabilityZone) - } + events[instanceID].RootFields.Put("cloud.instance.id", instanceID) + events[instanceID].RootFields.Put("cloud.machine.type", machineType) - if len(output.Values) > timestampIdx { - metricSetFieldResults[idStat{instanceID: instanceID, statistic: statistic}][label.MetricName] = fmt.Sprint(output.Values[timestampIdx]) - } + placement := instanceOutput[instanceID].Placement + if placement != nil { + events[instanceID].RootFields.Put("cloud.availability_zone", *placement.AvailabilityZone) + } - instanceStateName, err := instanceOutput[instanceID].State.Name.MarshalValue() - if err != nil { - return events, errors.Wrap(err, "instance.State.Name.MarshalValue failed") - } + if len(output.Values) > timestampIdx { + metricSetFieldResults[idStat{instanceID: instanceID, statistic: statistic}][label.MetricName] = fmt.Sprint(output.Values[timestampIdx]) + } - monitoringState, err := instanceOutput[instanceID].Monitoring.State.MarshalValue() - if err != nil { - return events, errors.Wrap(err, "instance.Monitoring.State.MarshalValue failed") - } + instanceStateName, err := instanceOutput[instanceID].State.Name.MarshalValue() + if err != nil { + return events, errors.Wrap(err, "instance.State.Name.MarshalValue failed") + } - monitoringStates[instanceID] = monitoringState + monitoringState, err := instanceOutput[instanceID].Monitoring.State.MarshalValue() + if err != nil { + return events, errors.Wrap(err, "instance.Monitoring.State.MarshalValue failed") + } - cpuOptions := instanceOutput[instanceID].CpuOptions - if cpuOptions != nil { - events[instanceID].MetricSetFields.Put("instance.core.count", *cpuOptions.CoreCount) - events[instanceID].MetricSetFields.Put("instance.threads_per_core", *cpuOptions.ThreadsPerCore) - } + monitoringStates[instanceID] = monitoringState - publicIP := instanceOutput[instanceID].PublicIpAddress - if publicIP != nil { - events[instanceID].MetricSetFields.Put("instance.public.ip", *publicIP) - } + cpuOptions := instanceOutput[instanceID].CpuOptions + if cpuOptions != nil { + events[instanceID].MetricSetFields.Put("instance.core.count", *cpuOptions.CoreCount) + events[instanceID].MetricSetFields.Put("instance.threads_per_core", *cpuOptions.ThreadsPerCore) + } - privateIP := instanceOutput[instanceID].PrivateIpAddress - if privateIP != nil { - events[instanceID].MetricSetFields.Put("instance.private.ip", *privateIP) - } + publicIP := instanceOutput[instanceID].PublicIpAddress + if publicIP != nil { + events[instanceID].MetricSetFields.Put("instance.public.ip", *publicIP) + } - events[instanceID].MetricSetFields.Put("instance.image.id", *instanceOutput[instanceID].ImageId) - events[instanceID].MetricSetFields.Put("instance.state.name", instanceStateName) - events[instanceID].MetricSetFields.Put("instance.state.code", *instanceOutput[instanceID].State.Code) - events[instanceID].MetricSetFields.Put("instance.monitoring.state", monitoringState) - events[instanceID].MetricSetFields.Put("instance.public.dns_name", *instanceOutput[instanceID].PublicDnsName) - events[instanceID].MetricSetFields.Put("instance.private.dns_name", *instanceOutput[instanceID].PrivateDnsName) + privateIP := instanceOutput[instanceID].PrivateIpAddress + if privateIP != nil { + events[instanceID].MetricSetFields.Put("instance.private.ip", *privateIP) } + + events[instanceID].MetricSetFields.Put("instance.image.id", *instanceOutput[instanceID].ImageId) + events[instanceID].MetricSetFields.Put("instance.state.name", instanceStateName) + events[instanceID].MetricSetFields.Put("instance.state.code", *instanceOutput[instanceID].State.Code) + events[instanceID].MetricSetFields.Put("instance.monitoring.state", monitoringState) + events[instanceID].MetricSetFields.Put("instance.public.dns_name", *instanceOutput[instanceID].PublicDnsName) + events[instanceID].MetricSetFields.Put("instance.private.dns_name", *instanceOutput[instanceID].PrivateDnsName) } } diff --git a/x-pack/metricbeat/module/aws/rds/rds.go b/x-pack/metricbeat/module/aws/rds/rds.go index f8bd907b3f6..08d2e84038d 100644 --- a/x-pack/metricbeat/module/aws/rds/rds.go +++ b/x-pack/metricbeat/module/aws/rds/rds.go @@ -273,70 +273,72 @@ func (m *MetricSet) createCloudWatchEvents(getMetricDataResults []cloudwatch.Met // Find a timestamp for all metrics in output timestamp := aws.FindTimestamp(getMetricDataResults) - if !timestamp.IsZero() { - for _, output := range getMetricDataResults { - if len(output.Values) == 0 { - continue + if timestamp.IsZero() { + return nil, nil + } + + for _, output := range getMetricDataResults { + if len(output.Values) == 0 { + continue + } + exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) + if exists { + labels := strings.Split(*output.Label, " ") + // Collect dimension values from the labels and initialize events and metricSetFieldResults with dimValues + var dimValues string + for i := 1; i < len(labels); i += 2 { + dimValues = dimValues + labels[i+1] } - exists, timestampIdx := aws.CheckTimestampInArray(timestamp, output.Timestamps) - if exists { - labels := strings.Split(*output.Label, " ") - // Collect dimension values from the labels and initialize events and metricSetFieldResults with dimValues - var dimValues string - for i := 1; i < len(labels); i += 2 { - dimValues = dimValues + labels[i+1] - } - if _, ok := events[dimValues]; !ok { - events[dimValues] = aws.InitEvent(regionName, m.AccountName, m.AccountID) - } + if _, ok := events[dimValues]; !ok { + events[dimValues] = aws.InitEvent(regionName, m.AccountName, m.AccountID, timestamp) + } - if _, ok := metricSetFieldResults[dimValues]; !ok { - metricSetFieldResults[dimValues] = map[string]interface{}{} - } + if _, ok := metricSetFieldResults[dimValues]; !ok { + metricSetFieldResults[dimValues] = map[string]interface{}{} + } - if len(output.Values) > timestampIdx && len(labels) > 0 { - if labels[metricNameIdx] == "CPUUtilization" { - metricSetFieldResults[dimValues][labels[metricNameIdx]] = fmt.Sprint(output.Values[timestampIdx] / 100) - } else { - metricSetFieldResults[dimValues][labels[metricNameIdx]] = fmt.Sprint(output.Values[timestampIdx]) - } + if len(output.Values) > timestampIdx && len(labels) > 0 { + if labels[metricNameIdx] == "CPUUtilization" { + metricSetFieldResults[dimValues][labels[metricNameIdx]] = fmt.Sprint(output.Values[timestampIdx] / 100) + } else { + metricSetFieldResults[dimValues][labels[metricNameIdx]] = fmt.Sprint(output.Values[timestampIdx]) + } - for i := 1; i < len(labels); i += 2 { - if labels[i] == "DBInstanceIdentifier" { - dbIdentifier := labels[i+1] - if _, found := events[dbIdentifier]; found { - if _, found := dbInstanceMap[dbIdentifier]; !found { - delete(metricSetFieldResults, dimValues) - continue - } - events[dbIdentifier].RootFields.Put("cloud.availability_zone", dbInstanceMap[dbIdentifier].dbAvailabilityZone) - events[dbIdentifier].MetricSetFields.Put("db_instance.arn", dbInstanceMap[dbIdentifier].dbArn) - events[dbIdentifier].MetricSetFields.Put("db_instance.class", dbInstanceMap[dbIdentifier].dbClass) - events[dbIdentifier].MetricSetFields.Put("db_instance.identifier", dbInstanceMap[dbIdentifier].dbIdentifier) - events[dbIdentifier].MetricSetFields.Put("db_instance.status", dbInstanceMap[dbIdentifier].dbStatus) - - for _, tag := range dbInstanceMap[dbIdentifier].tags { - events[dbIdentifier].ModuleFields.Put("tags."+tag.Key, tag.Value) - } + for i := 1; i < len(labels); i += 2 { + if labels[i] == "DBInstanceIdentifier" { + dbIdentifier := labels[i+1] + if _, found := events[dbIdentifier]; found { + if _, found := dbInstanceMap[dbIdentifier]; !found { + delete(metricSetFieldResults, dimValues) + continue + } + events[dbIdentifier].RootFields.Put("cloud.availability_zone", dbInstanceMap[dbIdentifier].dbAvailabilityZone) + events[dbIdentifier].MetricSetFields.Put("db_instance.arn", dbInstanceMap[dbIdentifier].dbArn) + events[dbIdentifier].MetricSetFields.Put("db_instance.class", dbInstanceMap[dbIdentifier].dbClass) + events[dbIdentifier].MetricSetFields.Put("db_instance.identifier", dbInstanceMap[dbIdentifier].dbIdentifier) + events[dbIdentifier].MetricSetFields.Put("db_instance.status", dbInstanceMap[dbIdentifier].dbStatus) + + for _, tag := range dbInstanceMap[dbIdentifier].tags { + events[dbIdentifier].ModuleFields.Put("tags."+tag.Key, tag.Value) } } - metricSetFieldResults[dimValues][labels[i]] = fmt.Sprint(labels[(i + 1)]) + } + metricSetFieldResults[dimValues][labels[i]] = fmt.Sprint(labels[(i + 1)]) + } + + // if tags_filter is given, then only return metrics with DBInstanceIdentifier as dimension + if m.TagsFilter != nil { + if len(labels) == 1 { + delete(events, dimValues) + delete(metricSetFieldResults, dimValues) } - // if tags_filter is given, then only return metrics with DBInstanceIdentifier as dimension - if m.TagsFilter != nil { - if len(labels) == 1 { + for i := 1; i < len(labels); i += 2 { + if labels[i] != "DBInstanceIdentifier" && i == len(labels)-2 { delete(events, dimValues) delete(metricSetFieldResults, dimValues) } - - for i := 1; i < len(labels); i += 2 { - if labels[i] != "DBInstanceIdentifier" && i == len(labels)-2 { - delete(events, dimValues) - delete(metricSetFieldResults, dimValues) - } - } } } } diff --git a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go index 7c9d453baca..d3a453f9508 100644 --- a/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go +++ b/x-pack/metricbeat/module/aws/s3_daily_storage/s3_daily_storage.go @@ -185,8 +185,6 @@ func createMetricDataQuery(metric cloudwatch.Metric, period time.Duration, index } func createCloudWatchEvents(outputs []cloudwatch.MetricDataResult, regionName string, bucketName string, accountName string, accountID string) (event mb.Event, err error) { - event = aws.InitEvent(regionName, accountName, accountID) - // AWS s3_daily_storage metrics mapOfMetricSetFieldResults := make(map[string]interface{}) @@ -213,6 +211,7 @@ func createCloudWatchEvents(outputs []cloudwatch.MetricDataResult, regionName st return } + event = aws.InitEvent(regionName, accountName, accountID, timestamp) event.MetricSetFields = resultMetricSetFields event.RootFields.Put("aws.s3.bucket.name", bucketName) return diff --git a/x-pack/metricbeat/module/aws/s3_request/s3_request.go b/x-pack/metricbeat/module/aws/s3_request/s3_request.go index afe53ac49ba..15fc6a007c6 100644 --- a/x-pack/metricbeat/module/aws/s3_request/s3_request.go +++ b/x-pack/metricbeat/module/aws/s3_request/s3_request.go @@ -187,8 +187,6 @@ func constructMetricQueries(listMetricsOutputs []cloudwatch.Metric, period time. // CreateS3Events creates s3_request and s3_daily_storage events from Cloudwatch metric data. func createS3RequestEvents(outputs []cloudwatch.MetricDataResult, regionName string, bucketName string, accountName string, accountID string) (event mb.Event, err error) { - event = aws.InitEvent(regionName, accountName, accountID) - // AWS s3_request metrics mapOfMetricSetFieldResults := make(map[string]interface{}) @@ -215,6 +213,7 @@ func createS3RequestEvents(outputs []cloudwatch.MetricDataResult, regionName str return } + event = aws.InitEvent(regionName, accountName, accountID, timestamp) event.MetricSetFields = resultMetricSetFields event.RootFields.Put("aws.s3.bucket.name", bucketName) return diff --git a/x-pack/metricbeat/module/aws/sqs/sqs.go b/x-pack/metricbeat/module/aws/sqs/sqs.go index 7bc6a8349d0..6e066d33db7 100644 --- a/x-pack/metricbeat/module/aws/sqs/sqs.go +++ b/x-pack/metricbeat/module/aws/sqs/sqs.go @@ -174,9 +174,7 @@ func createMetricDataQuery(metric cloudwatch.Metric, index int, period time.Dura return } -func createEventPerQueue(getMetricDataResults []cloudwatch.MetricDataResult, queueName string, metricsetName string, regionName string, schemaMetricFields s.Schema, accountName string, accountID string) (event mb.Event, err error) { - event = aws.InitEvent(regionName, accountName, accountID) - +func createEventPerQueue(getMetricDataResults []cloudwatch.MetricDataResult, queueName string, regionName string, schemaMetricFields s.Schema, accountName string, accountID string) (event mb.Event, err error) { // AWS sqs metrics mapOfMetricSetFieldResults := make(map[string]interface{}) @@ -203,6 +201,7 @@ func createEventPerQueue(getMetricDataResults []cloudwatch.MetricDataResult, que return } + event = aws.InitEvent(regionName, accountName, accountID, timestamp) event.MetricSetFields = resultMetricSetFields event.MetricSetFields.Put("queue.name", queueName) return @@ -212,7 +211,7 @@ func createSQSEvents(queueURLs []string, metricDataResults []cloudwatch.MetricDa for _, queueURL := range queueURLs { queueURLParsed := strings.Split(queueURL, "/") queueName := queueURLParsed[len(queueURLParsed)-1] - event, err := createEventPerQueue(metricDataResults, queueName, metricsetName, regionName, schemaRequestFields, accountName, accountID) + event, err := createEventPerQueue(metricDataResults, queueName, regionName, schemaRequestFields, accountName, accountID) if err != nil { event.Error = err report.Event(event) diff --git a/x-pack/metricbeat/modules.d/aws.yml.disabled b/x-pack/metricbeat/modules.d/aws.yml.disabled index d0053297885..dcd49bf4856 100644 --- a/x-pack/metricbeat/modules.d/aws.yml.disabled +++ b/x-pack/metricbeat/modules.d/aws.yml.disabled @@ -7,6 +7,7 @@ - elb - natgateway - rds + - s3_request - transitgateway - usage - vpn @@ -47,4 +48,3 @@ period: 24h metricsets: - s3_daily_storage - - s3_request From 600b7e476788707b3e5c96dbde52b997b7cd1c40 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Fri, 2 Oct 2020 15:03:43 -0600 Subject: [PATCH 2/3] add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a96a23db9d8..1a4f1055519 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -361,6 +361,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix remote_write flaky test. {pull}21173[21173] - Visualization title fixes in aws, azure and googlecloud compute dashboards. {pull}21098[21098] - Add a switch to the driver definition on SQL module to use pretty names {pull}17378[17378] +- Use timestamp from CloudWatch API when creating events. {pull}21498[21498] *Packetbeat* From 5cb6c5b43c17881dc3edc36178fb12261335cc0c Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 6 Oct 2020 09:49:01 -0600 Subject: [PATCH 3/3] Add TestCreateEventsTimestamp unit test for timestamp --- x-pack/metricbeat/module/aws/aws.go | 13 ++++---- .../module/aws/cloudwatch/cloudwatch_test.go | 33 ++++++++++++++++--- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/x-pack/metricbeat/module/aws/aws.go b/x-pack/metricbeat/module/aws/aws.go index 8b32edd6331..167e6a088a0 100644 --- a/x-pack/metricbeat/module/aws/aws.go +++ b/x-pack/metricbeat/module/aws/aws.go @@ -197,10 +197,13 @@ func StringInSlice(str string, list []string) (bool, int) { // InitEvent initialize mb.Event with basic information like service.name, cloud.provider func InitEvent(regionName string, accountName string, accountID string, timestamp time.Time) mb.Event { - event := mb.Event{} - event.MetricSetFields = common.MapStr{} - event.ModuleFields = common.MapStr{} - event.RootFields = common.MapStr{} + event := mb.Event{ + Timestamp: timestamp, + MetricSetFields: common.MapStr{}, + ModuleFields: common.MapStr{}, + RootFields: common.MapStr{}, + } + event.RootFields.Put("cloud.provider", "aws") if regionName != "" { event.RootFields.Put("cloud.region", regionName) @@ -211,8 +214,6 @@ func InitEvent(regionName string, accountName string, accountID string, timestam if accountID != "" { event.RootFields.Put("cloud.account.id", accountID) } - - event.Timestamp = timestamp return event } diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go index eb0ee1d5882..ecd4bb2f9d1 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go @@ -11,9 +11,6 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/libbeat/logp" - "github.com/elastic/beats/v7/metricbeat/mb" - awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/cloudwatch" "github.com/aws/aws-sdk-go-v2/service/cloudwatch/cloudwatchiface" @@ -22,12 +19,14 @@ import ( "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/beats/v7/x-pack/metricbeat/module/aws" ) var ( regionName = "us-west-1" - timestamp = time.Now() + timestamp = time.Date(2020, 10, 06, 00, 00, 00, 0, time.UTC) accountID = "123456789012" accountName = "test" @@ -1569,3 +1568,29 @@ func TestConfigDimensionValueContainsWildcard(t *testing.T) { }) } } + +func TestCreateEventsTimestamp(t *testing.T) { + m := MetricSet{ + logger: logp.NewLogger("test"), + CloudwatchConfigs: []Config{{Statistic: []string{"Average"}}}, + MetricSet: &aws.MetricSet{Period: 5, AccountID: accountID}, + } + + listMetricWithStatsTotal := []metricsWithStatistics{ + { + cloudwatch.Metric{ + MetricName: awssdk.String("CPUUtilization"), + Namespace: awssdk.String("AWS/EC2"), + }, + []string{"Average"}, + nil, + }, + } + + resourceTypeTagFilters := map[string][]aws.Tag{} + startTime, endTime := aws.GetStartTimeEndTime(m.MetricSet.Period, m.MetricSet.Latency) + + events, err := m.createEvents(&MockCloudWatchClientWithoutDim{}, &MockResourceGroupsTaggingClient{}, listMetricWithStatsTotal, resourceTypeTagFilters, regionName, startTime, endTime) + assert.NoError(t, err) + assert.Equal(t, timestamp, events[regionName+accountID+namespace].Timestamp) +}