Skip to content

Commit

Permalink
fix metadata fetching when metrics have suffixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dashpole committed Jan 5, 2022
1 parent 514a904 commit b012ce0
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 123 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
- `ecstaskobserver`: Fix "Incorrect conversion between integer types" security issue (#6939)
- Fix typo in "direction" metrics attribute description (#6949)
- `zookeeperreceiver`: Fix issue where receiver could panic during shutdown (#7020)
- `prometheusreceiver`: Fix metadata fetching when metrics differ by trimmable suffixes (#6932)

## 💡 Enhancements 💡

Expand Down
128 changes: 66 additions & 62 deletions receiver/prometheusreceiver/internal/metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)

// MetricFamily is unit which is corresponding to the metrics items which shared the same TYPE/UNIT/... metadata from
// a single scrape.
type MetricFamily interface {
Add(metricName string, ls labels.Labels, t int64, v float64) error
ToMetric() (*metricspb.Metric, int, int)
}

type metricFamily struct {
name string
mtype metricspb.MetricDescriptor_Type
Expand All @@ -48,37 +41,35 @@ type metricFamily struct {
intervalStartTimeMs int64
}

func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) MetricFamily {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
metadata, ok := mc.Metadata(familyName)
if !ok && metricName != familyName {
// use the original metricName as metricFamily
familyName = metricName
// perform a 2nd lookup with the original metric name. it can happen if there's a metric which is not histogram
// or summary, but ends with one of those _count/_sum suffixes
metadata, ok = mc.Metadata(metricName)
// still not found, this can happen when metric has no TYPE HINT
if !ok {
metadata.Metric = familyName
metadata.Type = textparse.MetricTypeUnknown
func metadataForMetric(metricName string, mc MetadataCache) (*scrape.MetricMetadata, string) {
if metadata, ok := internalMetricMetadata[metricName]; ok {
return metadata, metricName
}
if metadata, ok := mc.Metadata(metricName); ok {
return &metadata, metricName
}
// If we didn't find metadata with the original name,
// try with suffixes trimmed, in-case it is a "merged" metric type.
normalizedName := normalizeMetricName(metricName)
if metadata, ok := mc.Metadata(normalizedName); ok {
if metadata.Type == textparse.MetricTypeCounter {
return &metadata, metricName
}
} else if !ok && isInternalMetric(metricName) {
metadata = defineInternalMetric(metricName, metadata, logger)
} else if !ok {
// Prometheus sends metrics without a type hint as gauges.
// MetricTypeUnknown is converted to a gauge in convToOCAMetricType()
metadata.Type = textparse.MetricTypeUnknown
return &metadata, normalizedName
}
// Otherwise, the metric is unknown
return &scrape.MetricMetadata{
Metric: metricName,
Type: textparse.MetricTypeUnknown,
}, metricName
}

func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger, intervalStartTimeMs int64) *metricFamily {
metadata, familyName := metadataForMetric(metricName, mc)
ocaMetricType := convToOCAMetricType(metadata.Type)

// If a counter has a _total suffix but metadata is stored without it, keep _total suffix as the name otherwise
// the metric sent won't have the suffix
if ocaMetricType == metricspb.MetricDescriptor_CUMULATIVE_DOUBLE && strings.HasSuffix(metricName, metricSuffixTotal) {
familyName = metricName
} else if ocaMetricType == metricspb.MetricDescriptor_UNSPECIFIED {
logger.Debug(fmt.Sprintf("Invalid metric : %s %+v", metricName, metadata))
if ocaMetricType == metricspb.MetricDescriptor_UNSPECIFIED {
logger.Debug(fmt.Sprintf("Unknown-typed metric : %s %+v", metricName, metadata))
}

return &metricFamily{
Expand All @@ -88,40 +79,41 @@ func newMetricFamily(metricName string, mc MetadataCache, logger *zap.Logger, in
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
metadata: metadata,
groupOrders: make(map[string]int),
groups: make(map[string]*metricGroup),
intervalStartTimeMs: intervalStartTimeMs,
}
}

// Define manually the metadata of prometheus scrapper internal metrics
func defineInternalMetric(metricName string, metadata scrape.MetricMetadata, logger *zap.Logger) scrape.MetricMetadata {
if metadata.Metric != "" && metadata.Type != "" && metadata.Help != "" {
logger.Debug("Internal metric seems already fully defined")
return metadata
}
metadata.Metric = metricName

switch metricName {
case scrapeUpMetricName:
metadata.Type = textparse.MetricTypeGauge
metadata.Help = "The scraping was successful"
case "scrape_duration_seconds":
metadata.Unit = "seconds"
metadata.Type = textparse.MetricTypeGauge
metadata.Help = "Duration of the scrape"
case "scrape_samples_scraped":
metadata.Type = textparse.MetricTypeGauge
metadata.Help = "The number of samples the target exposed"
case "scrape_series_added":
metadata.Type = textparse.MetricTypeGauge
metadata.Help = "The approximate number of new series in this scrape"
case "scrape_samples_post_metric_relabeling":
metadata.Type = textparse.MetricTypeGauge
metadata.Help = "The number of samples remaining after metric relabeling was applied"
}
return metadata
// internalMetricMetadata allows looking up metadata for internal scrape metrics
var internalMetricMetadata = map[string]*scrape.MetricMetadata{
scrapeUpMetricName: {
Metric: scrapeUpMetricName,
Type: textparse.MetricTypeGauge,
Help: "The scraping was successful",
},
"scrape_duration_seconds": {
Metric: "scrape_duration_seconds",
Unit: "seconds",
Type: textparse.MetricTypeGauge,
Help: "Duration of the scrape",
},
"scrape_samples_scraped": {
Metric: "scrape_samples_scraped",
Type: textparse.MetricTypeGauge,
Help: "The number of samples the target exposed",
},
"scrape_series_added": {
Metric: "scrape_series_added",
Type: textparse.MetricTypeGauge,
Help: "The approximate number of new series in this scrape",
},
"scrape_samples_post_metric_relabeling": {
Metric: "scrape_samples_post_metric_relabeling",
Type: textparse.MetricTypeGauge,
Help: "The number of samples remaining after metric relabeling was applied",
},
}

// updateLabelKeys is used to store all the label keys of a same metric family in observed order. since prometheus
Expand All @@ -142,6 +134,18 @@ func (mf *metricFamily) updateLabelKeys(ls labels.Labels) {
}
}

// includesMetric returns true if the metric is part of the family
func (mf *metricFamily) includesMetric(metricName string) bool {
if mf.isCumulativeType() || mf.mtype == metricspb.MetricDescriptor_GAUGE_DISTRIBUTION {
// If it is a type that can have suffixes removed, then the metric should match the
// family name when suffixes are trimmed.
return normalizeMetricName(metricName) == mf.name
}
// If it isn't a merged type, the metricName and family name
// should match
return metricName == mf.name
}

func (mf *metricFamily) isCumulativeType() bool {
return mf.mtype == metricspb.MetricDescriptor_CUMULATIVE_DOUBLE ||
mf.mtype == metricspb.MetricDescriptor_CUMULATIVE_INT64 ||
Expand Down
12 changes: 6 additions & 6 deletions receiver/prometheusreceiver/internal/metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type metricBuilder struct {
startTime float64
intervalStartTimeMs int64
logger *zap.Logger
families map[string]MetricFamily
families map[string]*metricFamily
}

// newMetricBuilder creates a MetricBuilder which is allowed to feed all the datapoints from a single prometheus
Expand All @@ -72,7 +72,7 @@ func newMetricBuilder(mc MetadataCache, useStartTimeMetric bool, startTimeMetric
return &metricBuilder{
mc: mc,
metrics: make([]*metricspb.Metric, 0),
families: map[string]MetricFamily{},
families: map[string]*metricFamily{},
logger: logger,
numTimeseries: 0,
droppedTimeseries: 0,
Expand Down Expand Up @@ -138,14 +138,14 @@ func (b *metricBuilder) AddDataPoint(ls labels.Labels, t int64, v float64) error

b.hasData = true

familyName := normalizeMetricName(metricName)
curMF, ok := b.families[familyName]
curMF, ok := b.families[metricName]
if !ok {
if mf, ok := b.families[metricName]; ok {
familyName := normalizeMetricName(metricName)
if mf, ok := b.families[familyName]; ok && mf.includesMetric(metricName) {
curMF = mf
} else {
curMF = newMetricFamily(metricName, b.mc, b.logger, b.intervalStartTimeMs)
b.families[familyName] = curMF
b.families[curMF.name] = curMF
}
}

Expand Down
6 changes: 2 additions & 4 deletions receiver/prometheusreceiver/internal/metricsbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ var testMetadata = map[string]scrape.MetricMetadata{
"summary_test": {Metric: "summary_test", Type: textparse.MetricTypeSummary, Help: "", Unit: ""},
"summary_test2": {Metric: "summary_test2", Type: textparse.MetricTypeSummary, Help: "", Unit: ""},
"unknown_test": {Metric: "unknown_test", Type: textparse.MetricTypeUnknown, Help: "", Unit: ""},
"poor_name": {Metric: "poor_name", Type: textparse.MetricTypeGauge, Help: "", Unit: ""},
"poor_name_count": {Metric: "poor_name_count", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"up": {Metric: "up", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"scrape_foo": {Metric: "scrape_foo", Type: textparse.MetricTypeCounter, Help: "", Unit: ""},
"example_process_start_time_seconds": {Metric: "example_process_start_time_seconds",
Type: textparse.MetricTypeGauge, Help: "", Unit: ""},
Expand Down Expand Up @@ -242,10 +242,8 @@ func Test_metricBuilder_counters(t *testing.T) {
},
},
},
// Some counters such as "python_gc_collections_total" have metadata key as "python_gc_collections" but still need
// to be converted using full metric name as "python_gc_collections_total" to match Prometheus functionality
{
name: "counter-with-metadata-without-total-suffix",
name: "counter-with-total-suffix",
inputs: []*testScrapedPage{
{
pts: []*testDataPoint{
Expand Down
58 changes: 16 additions & 42 deletions receiver/prometheusreceiver/internal/otlp_metricfamily.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,11 @@ import (
"time"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/textparse"
"github.com/prometheus/prometheus/scrape"
"go.opentelemetry.io/collector/model/pdata"
"go.uber.org/zap"
)

// MetricFamilyPdata is unit which is corresponding to the metrics items which shared the same TYPE/UNIT/... metadata from
// a single scrape.
type MetricFamilyPdata interface {
Add(metricName string, ls labels.Labels, t int64, v float64) error
ToMetricPdata(metrics *pdata.MetricSlice) (int, int)
}

type metricFamilyPdata struct {
mtype pdata.MetricDataType
groups map[string]*metricGroupPdata
Expand Down Expand Up @@ -61,41 +53,11 @@ type metricGroupPdata struct {
complexValue []*dataPoint
}

func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger) MetricFamilyPdata {
familyName := normalizeMetricName(metricName)

// lookup metadata based on familyName
metadata, ok := mc.Metadata(familyName)
if !ok && metricName != familyName {
// use the original metricName as metricFamily
familyName = metricName
// perform a 2nd lookup with the original metric name. it can happen if there's a metric which is not histogram
// or summary, but ends with one of those _count/_sum suffixes
metadata, ok = mc.Metadata(metricName)
// still not found, this can happen when metric has no TYPE HINT
if !ok {
metadata.Metric = familyName
metadata.Type = textparse.MetricTypeUnknown
}
} else if !ok {
if isInternalMetric(metricName) {
metadata = defineInternalMetric(metricName, metadata, logger)
} else {
// Prometheus sends metrics without a type hint as gauges.
// MetricTypeUnknown is converted to a gauge in convToOCAMetricType()
metadata.Type = textparse.MetricTypeUnknown
}
}

func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logger) *metricFamilyPdata {
metadata, familyName := metadataForMetric(metricName, mc)
mtype := convToPdataMetricType(metadata.Type)
if mtype == pdata.MetricDataTypeNone {
logger.Debug(fmt.Sprintf("Invalid metric : %s %+v", metricName, metadata))
}

// If a counter has a _total suffix but metadata is stored without it, keep _total suffix as the name otherwise
// the metric sent won't have the suffix
if mtype == pdata.MetricDataTypeSum && strings.HasSuffix(metricName, metricSuffixTotal) {
familyName = metricName
logger.Debug(fmt.Sprintf("Unknown-typed metric : %s %+v", metricName, metadata))
}

return &metricFamilyPdata{
Expand All @@ -106,7 +68,7 @@ func newMetricFamilyPdata(metricName string, mc MetadataCache, logger *zap.Logge
droppedTimeseries: 0,
labelKeys: make(map[string]bool),
labelKeysOrdered: make([]string, 0),
metadata: &metadata,
metadata: metadata,
groupOrders: make(map[string]int),
}
}
Expand All @@ -130,6 +92,18 @@ func (mf *metricFamilyPdata) updateLabelKeys(ls labels.Labels) {
}
}

// includesMetric returns true if the metric is part of the family
func (mf *metricFamilyPdata) includesMetric(metricName string) bool {
if mf.isCumulativeTypePdata() {
// If it is a merged family type, then it should match the
// family name when suffixes are trimmed.
return normalizeMetricName(metricName) == mf.name
}
// If it isn't a merged type, the metricName and family name
// should match
return metricName == mf.name
}

func (mf *metricFamilyPdata) getGroupKey(ls labels.Labels) string {
mf.updateLabelKeys(ls)
return dpgSignature(mf.labelKeysOrdered, ls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func TestMetricGroupData_toDistributionUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.metricName, mc, zap.NewNop()).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.metricName, mc, zap.NewNop())
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestMetricGroupData_toSummaryUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.name, mc, zap.NewNop()).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.name, mc, zap.NewNop())
for _, lbs := range tt.labelsScrapes {
for _, scrape := range lbs.scrapes {
require.NoError(t, mp.Add(scrape.metric, lbs.labels.Copy(), scrape.at, scrape.value))
Expand Down Expand Up @@ -323,7 +323,7 @@ func TestMetricGroupData_toNumberDataUnitTest(t *testing.T) {
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
mp := newMetricFamilyPdata(tt.metricKind, mc, zap.NewNop()).(*metricFamilyPdata)
mp := newMetricFamilyPdata(tt.metricKind, mc, zap.NewNop())
for _, tv := range tt.scrapes {
require.NoError(t, mp.Add(tv.metric, tt.labels.Copy(), tv.at, tv.value))
}
Expand Down
12 changes: 6 additions & 6 deletions receiver/prometheusreceiver/internal/otlp_metricsbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func convToPdataMetricType(metricType textparse.MetricType) pdata.MetricDataType

type metricBuilderPdata struct {
metrics pdata.MetricSlice
families map[string]MetricFamilyPdata
families map[string]*metricFamilyPdata
hasData bool
hasInternalMetric bool
mc MetadataCache
Expand All @@ -105,7 +105,7 @@ func newMetricBuilderPdata(mc MetadataCache, useStartTimeMetric bool, startTimeM
}
return &metricBuilderPdata{
metrics: pdata.NewMetricSlice(),
families: map[string]MetricFamilyPdata{},
families: map[string]*metricFamilyPdata{},
mc: mc,
logger: logger,
numTimeseries: 0,
Expand Down Expand Up @@ -172,14 +172,14 @@ func (b *metricBuilderPdata) AddDataPoint(ls labels.Labels, t int64, v float64)

b.hasData = true

familyName := normalizeMetricName(metricName)
curMF, ok := b.families[familyName]
curMF, ok := b.families[metricName]
if !ok {
if mf, ok := b.families[metricName]; ok {
familyName := normalizeMetricName(metricName)
if mf, ok := b.families[familyName]; ok && mf.includesMetric(metricName) {
curMF = mf
} else {
curMF = newMetricFamilyPdata(metricName, b.mc, b.logger)
b.families[familyName] = curMF
b.families[curMF.name] = curMF
}
}

Expand Down

0 comments on commit b012ce0

Please sign in to comment.