From aa6ed5e0addc3fab95d9023298c896704fdafc24 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 27 Aug 2024 19:40:55 +0000 Subject: [PATCH 1/4] switch to uint64 hash for identifier --- .../datapointstorage/datapointcache.go | 98 +++++++++----- .../datapointstorage/datapointcache_test.go | 128 ++++++++++-------- .../internal/normalization/benchmark_test.go | 8 +- .../normalization/disabled_normalizer.go | 8 +- .../normalization/standard_normalizer.go | 8 +- .../collector/internal/normalization/types.go | 8 +- exporter/collector/metrics.go | 24 +++- 7 files changed, 168 insertions(+), 114 deletions(-) diff --git a/exporter/collector/internal/datapointstorage/datapointcache.go b/exporter/collector/internal/datapointstorage/datapointcache.go index f8c894a26..d45dc7385 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache.go +++ b/exporter/collector/internal/datapointstorage/datapointcache.go @@ -15,9 +15,9 @@ package datapointstorage import ( - "fmt" + "hash" + "hash/fnv" "sort" - "strings" "sync" "time" @@ -30,10 +30,10 @@ import ( const gcInterval = 20 * time.Minute type Cache struct { - numberCache map[string]usedNumberPoint - summaryCache map[string]usedSummaryPoint - histogramCache map[string]usedHistogramPoint - exponentialHistogramCache map[string]usedExponentialHistogramPoint + numberCache map[uint64]usedNumberPoint + summaryCache map[uint64]usedSummaryPoint + histogramCache map[uint64]usedHistogramPoint + exponentialHistogramCache map[uint64]usedExponentialHistogramPoint numberLock sync.RWMutex summaryLock sync.RWMutex histogramLock sync.RWMutex @@ -63,10 +63,10 @@ type usedExponentialHistogramPoint struct { // NewCache instantiates a cache and starts background processes. func NewCache(shutdown <-chan struct{}) *Cache { c := &Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } go func() { ticker := time.NewTicker(gcInterval) @@ -79,7 +79,7 @@ func NewCache(shutdown <-chan struct{}) *Cache { // GetNumberDataPoint retrieves the point associated with the identifier, and whether // or not it was found. -func (c *Cache) GetNumberDataPoint(identifier string) (pmetric.NumberDataPoint, bool) { +func (c *Cache) GetNumberDataPoint(identifier uint64) (pmetric.NumberDataPoint, bool) { c.numberLock.RLock() defer c.numberLock.RUnlock() point, found := c.numberCache[identifier] @@ -90,7 +90,7 @@ func (c *Cache) GetNumberDataPoint(identifier string) (pmetric.NumberDataPoint, } // SetNumberDataPoint assigns the point to the identifier in the cache. -func (c *Cache) SetNumberDataPoint(identifier string, point pmetric.NumberDataPoint) { +func (c *Cache) SetNumberDataPoint(identifier uint64, point pmetric.NumberDataPoint) { c.numberLock.Lock() defer c.numberLock.Unlock() c.numberCache[identifier] = usedNumberPoint{point, atomic.NewBool(true)} @@ -98,7 +98,7 @@ func (c *Cache) SetNumberDataPoint(identifier string, point pmetric.NumberDataPo // GetSummaryDataPoint retrieves the point associated with the identifier, and whether // or not it was found. -func (c *Cache) GetSummaryDataPoint(identifier string) (pmetric.SummaryDataPoint, bool) { +func (c *Cache) GetSummaryDataPoint(identifier uint64) (pmetric.SummaryDataPoint, bool) { c.summaryLock.RLock() defer c.summaryLock.RUnlock() point, found := c.summaryCache[identifier] @@ -109,7 +109,7 @@ func (c *Cache) GetSummaryDataPoint(identifier string) (pmetric.SummaryDataPoint } // SetSummaryDataPoint assigns the point to the identifier in the cache. -func (c *Cache) SetSummaryDataPoint(identifier string, point pmetric.SummaryDataPoint) { +func (c *Cache) SetSummaryDataPoint(identifier uint64, point pmetric.SummaryDataPoint) { c.summaryLock.Lock() defer c.summaryLock.Unlock() c.summaryCache[identifier] = usedSummaryPoint{point, atomic.NewBool(true)} @@ -117,7 +117,7 @@ func (c *Cache) SetSummaryDataPoint(identifier string, point pmetric.SummaryData // GetHistogramDataPoint retrieves the point associated with the identifier, and whether // or not it was found. -func (c *Cache) GetHistogramDataPoint(identifier string) (pmetric.HistogramDataPoint, bool) { +func (c *Cache) GetHistogramDataPoint(identifier uint64) (pmetric.HistogramDataPoint, bool) { c.histogramLock.RLock() defer c.histogramLock.RUnlock() point, found := c.histogramCache[identifier] @@ -128,7 +128,7 @@ func (c *Cache) GetHistogramDataPoint(identifier string) (pmetric.HistogramDataP } // SetHistogramDataPoint assigns the point to the identifier in the cache. -func (c *Cache) SetHistogramDataPoint(identifier string, point pmetric.HistogramDataPoint) { +func (c *Cache) SetHistogramDataPoint(identifier uint64, point pmetric.HistogramDataPoint) { c.histogramLock.Lock() defer c.histogramLock.Unlock() c.histogramCache[identifier] = usedHistogramPoint{point, atomic.NewBool(true)} @@ -136,7 +136,7 @@ func (c *Cache) SetHistogramDataPoint(identifier string, point pmetric.Histogram // GetExponentialHistogramDataPoint retrieves the point associated with the identifier, and whether // or not it was found. -func (c *Cache) GetExponentialHistogramDataPoint(identifier string) (pmetric.ExponentialHistogramDataPoint, bool) { +func (c *Cache) GetExponentialHistogramDataPoint(identifier uint64) (pmetric.ExponentialHistogramDataPoint, bool) { c.exponentialHistogramLock.RLock() defer c.exponentialHistogramLock.RUnlock() point, found := c.exponentialHistogramCache[identifier] @@ -147,7 +147,7 @@ func (c *Cache) GetExponentialHistogramDataPoint(identifier string) (pmetric.Exp } // SetExponentialHistogramDataPoint assigns the point to the identifier in the cache. -func (c *Cache) SetExponentialHistogramDataPoint(identifier string, point pmetric.ExponentialHistogramDataPoint) { +func (c *Cache) SetExponentialHistogramDataPoint(identifier uint64, point pmetric.ExponentialHistogramDataPoint) { c.exponentialHistogramLock.Lock() defer c.exponentialHistogramLock.Unlock() c.exponentialHistogramCache[identifier] = usedExponentialHistogramPoint{point, atomic.NewBool(true)} @@ -212,27 +212,57 @@ func (c *Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) bool { } // Identifier returns the unique string identifier for a metric. -func Identifier(resource *monitoredrespb.MonitoredResource, extraLabels map[string]string, metric pmetric.Metric, attributes pcommon.Map) string { - var b strings.Builder +func Identifier(resource *monitoredrespb.MonitoredResource, extraLabels map[string]string, metric pmetric.Metric, attributes pcommon.Map) (uint64, error) { + var err error + h := fnv.New64() - // Resource identifiers - if resource != nil { - fmt.Fprintf(&b, "%v", resource.GetLabels()) + _, err = h.Write([]byte(resource.GetType())) + if err != nil { + return 0, err + } + _, err = h.Write([]byte(metric.Name())) + if err != nil { + return 0, err } - // Instrumentation library labels and additional resource labels - fmt.Fprintf(&b, " - %v", extraLabels) - - // Metric identifiers - fmt.Fprintf(&b, " - %s -", metric.Name()) - attrsIds := make([]string, 0, attributes.Len()) + attrs := make(map[string]string) attributes.Range(func(k string, v pcommon.Value) bool { - attrsIds = append(attrsIds, k+"="+v.AsString()) + attrs[k] = v.AsString() return true }) - if len(attrsIds) > 0 { - sort.Strings(attrsIds) - fmt.Fprint(&b, " "+strings.Join(attrsIds, " ")) + + err = hashOfMap(h, extraLabels) + if err != nil { + return 0, err + } + + err = hashOfMap(h, attrs) + if err != nil { + return 0, err + } + + err = hashOfMap(h, resource.GetLabels()) + if err != nil { + return 0, err + } + return h.Sum64(), err +} + +func hashOfMap(h hash.Hash64, m map[string]string) error { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + for _, key := range keys { + _, err := h.Write([]byte(key)) + if err != nil { + return err + } + _, err = h.Write([]byte(m[key])) + if err != nil { + return err + } } - return b.String() + return nil } diff --git a/exporter/collector/internal/datapointstorage/datapointcache_test.go b/exporter/collector/internal/datapointstorage/datapointcache_test.go index 0b050b4f9..a3d597268 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache_test.go +++ b/exporter/collector/internal/datapointstorage/datapointcache_test.go @@ -27,16 +27,17 @@ import ( func TestSetAndGet(t *testing.T) { c := Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } - _, found := c.GetNumberDataPoint("bar") + id := uint64(12345) + _, found := c.GetNumberDataPoint(id) assert.False(t, found) setPoint := pmetric.NewNumberDataPoint() - c.SetNumberDataPoint("bar", setPoint) - point, found := c.GetNumberDataPoint("bar") + c.SetNumberDataPoint(id, setPoint) + point, found := c.GetNumberDataPoint(id) assert.Equal(t, point, setPoint) assert.True(t, found) } @@ -44,10 +45,10 @@ func TestSetAndGet(t *testing.T) { func TestShutdown(t *testing.T) { shutdown := make(chan struct{}) c := Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } close(shutdown) // gc should return after shutdown is closed @@ -58,17 +59,18 @@ func TestShutdown(t *testing.T) { func TestGC(t *testing.T) { shutdown := make(chan struct{}) c := Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } fakeTicker := make(chan time.Time) + id := uint64(12345) - c.SetNumberDataPoint("bar", pmetric.NumberDataPoint{}) + c.SetNumberDataPoint(id, pmetric.NumberDataPoint{}) // bar exists since we just set it - usedPoint, found := c.numberCache["bar"] + usedPoint, found := c.numberCache[id] assert.True(t, usedPoint.used.Load()) assert.True(t, found) @@ -78,7 +80,7 @@ func TestGC(t *testing.T) { }() cont := c.gc(shutdown, fakeTicker) assert.True(t, cont) - usedPoint, found = c.numberCache["bar"] + usedPoint, found = c.numberCache[id] assert.False(t, usedPoint.used.Load()) assert.True(t, found) @@ -88,24 +90,25 @@ func TestGC(t *testing.T) { }() cont = c.gc(shutdown, fakeTicker) assert.True(t, cont) - _, found = c.numberCache["bar"] + _, found = c.numberCache[id] assert.False(t, found) } func TestGetPreventsGC(t *testing.T) { + id := uint64(12345) shutdown := make(chan struct{}) c := Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } fakeTicker := make(chan time.Time) setPoint := pmetric.NewNumberDataPoint() - c.SetNumberDataPoint("bar", setPoint) + c.SetNumberDataPoint(id, setPoint) // bar exists since we just set it - _, found := c.numberCache["bar"] + _, found := c.numberCache[id] assert.True(t, found) // first gc tick marks bar stale go func() { @@ -114,7 +117,7 @@ func TestGetPreventsGC(t *testing.T) { cont := c.gc(shutdown, fakeTicker) assert.True(t, cont) // calling Get() marks it fresh again. - _, found = c.GetNumberDataPoint("bar") + _, found = c.GetNumberDataPoint(id) assert.True(t, found) // second gc tick does not remove bar go func() { @@ -122,16 +125,17 @@ func TestGetPreventsGC(t *testing.T) { }() cont = c.gc(shutdown, fakeTicker) assert.True(t, cont) - _, found = c.numberCache["bar"] + _, found = c.numberCache[id] assert.True(t, found) } func TestConcurrentNumber(t *testing.T) { + id := uint64(12345) c := Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } setPoint := pmetric.NewNumberDataPoint() @@ -139,8 +143,8 @@ func TestConcurrentNumber(t *testing.T) { for i := 0; i < 5; i++ { wg.Add(1) go func() { - c.SetNumberDataPoint("bar", setPoint) - point, found := c.GetNumberDataPoint("bar") + c.SetNumberDataPoint(id, setPoint) + point, found := c.GetNumberDataPoint(id) assert.Equal(t, point, setPoint) assert.True(t, found) wg.Done() @@ -158,11 +162,12 @@ func TestConcurrentNumber(t *testing.T) { } func TestConcurrentSummary(t *testing.T) { + id := uint64(12345) c := Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } setPoint := pmetric.NewSummaryDataPoint() @@ -170,8 +175,8 @@ func TestConcurrentSummary(t *testing.T) { for i := 0; i < 5; i++ { wg.Add(1) go func() { - c.SetSummaryDataPoint("bar", setPoint) - point, found := c.GetSummaryDataPoint("bar") + c.SetSummaryDataPoint(id, setPoint) + point, found := c.GetSummaryDataPoint(id) assert.Equal(t, point, setPoint) assert.True(t, found) wg.Done() @@ -189,11 +194,12 @@ func TestConcurrentSummary(t *testing.T) { } func TestConcurrentHistogram(t *testing.T) { + id := uint64(12345) c := Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } setPoint := pmetric.NewHistogramDataPoint() @@ -201,8 +207,8 @@ func TestConcurrentHistogram(t *testing.T) { for i := 0; i < 5; i++ { wg.Add(1) go func() { - c.SetHistogramDataPoint("bar", setPoint) - point, found := c.GetHistogramDataPoint("bar") + c.SetHistogramDataPoint(id, setPoint) + point, found := c.GetHistogramDataPoint(id) assert.Equal(t, point, setPoint) assert.True(t, found) wg.Done() @@ -220,11 +226,12 @@ func TestConcurrentHistogram(t *testing.T) { } func TestConcurrentExponentialHistogram(t *testing.T) { + id := uint64(12345) c := Cache{ - numberCache: make(map[string]usedNumberPoint), - summaryCache: make(map[string]usedSummaryPoint), - histogramCache: make(map[string]usedHistogramPoint), - exponentialHistogramCache: make(map[string]usedExponentialHistogramPoint), + numberCache: make(map[uint64]usedNumberPoint), + summaryCache: make(map[uint64]usedSummaryPoint), + histogramCache: make(map[uint64]usedHistogramPoint), + exponentialHistogramCache: make(map[uint64]usedExponentialHistogramPoint), } setPoint := pmetric.NewExponentialHistogramDataPoint() @@ -232,8 +239,8 @@ func TestConcurrentExponentialHistogram(t *testing.T) { for i := 0; i < 5; i++ { wg.Add(1) go func() { - c.SetExponentialHistogramDataPoint("bar", setPoint) - point, found := c.GetExponentialHistogramDataPoint("bar") + c.SetExponentialHistogramDataPoint(id, setPoint) + point, found := c.GetExponentialHistogramDataPoint(id) assert.Equal(t, point, setPoint) assert.True(t, found) wg.Done() @@ -274,43 +281,43 @@ func TestIdentifier(t *testing.T) { metric pmetric.Metric labels pcommon.Map desc string - want string + want uint64 }{ { desc: "empty", - want: " - map[] - -", + want: 14695981039346656037, metric: pmetric.NewMetric(), labels: pmetric.NewNumberDataPoint().Attributes(), }, { desc: "with name", - want: " - map[] - custom.googleapis.com/test.metric -", + want: 4430695870278802542, metric: metricWithName, labels: pmetric.NewNumberDataPoint().Attributes(), }, { desc: "with attributes", - want: " - map[] - - bool=true int=123 string=strval", + want: 12689322622784691089, metric: pmetric.NewMetric(), labels: dpWithAttributes.Attributes(), }, { desc: "with resource", - want: "map[location:us-central1-b project:project-foo] - map[] - -", + want: 10341971859444782974, resource: monitoredResource, metric: pmetric.NewMetric(), labels: pmetric.NewNumberDataPoint().Attributes(), }, { desc: "with extra labels", - want: " - map[foo:bar hello:world] - -", + want: 5179797973592387646, metric: pmetric.NewMetric(), labels: pmetric.NewNumberDataPoint().Attributes(), extraLabels: extraLabels, }, { desc: "with all", - want: "map[location:us-central1-b project:project-foo] - map[foo:bar hello:world] - custom.googleapis.com/test.metric - bool=true int=123 string=strval", + want: 322569983349436668, metric: metricWithName, labels: dpWithAttributes.Attributes(), extraLabels: extraLabels, @@ -320,9 +327,10 @@ func TestIdentifier(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { origLabels := pcommon.NewMap() tc.labels.CopyTo(origLabels) - got := Identifier(tc.resource, tc.extraLabels, tc.metric, tc.labels) + got, err := Identifier(tc.resource, tc.extraLabels, tc.metric, tc.labels) + assert.NoError(t, err) if tc.want != got { - t.Errorf("Identifier() = %q; want %q", got, tc.want) + t.Errorf("Identifier() = %d; want %d", got, tc.want) } assert.Equal(t, origLabels, tc.labels) // Make sure the labels are not mutated }) diff --git a/exporter/collector/internal/normalization/benchmark_test.go b/exporter/collector/internal/normalization/benchmark_test.go index 3c18cfeb3..bdb15b8c8 100644 --- a/exporter/collector/internal/normalization/benchmark_test.go +++ b/exporter/collector/internal/normalization/benchmark_test.go @@ -33,7 +33,7 @@ func BenchmarkNormalizeNumberDataPoint(b *testing.B) { startPoint.SetIntValue(12) startPoint.Exemplars().AppendEmpty().SetIntValue(0) addAttributes(startPoint.Attributes()) - id := "abc123" + id := uint64(12345) // ensure each run is the same by skipping the first call, which will populate caches normalizer.NormalizeNumberDataPoint(startPoint, id) newPoint := pmetric.NewNumberDataPoint() @@ -60,7 +60,7 @@ func BenchmarkNormalizeHistogramDataPoint(b *testing.B) { startPoint.ExplicitBounds().FromRaw([]float64{1, 2}) startPoint.Exemplars().AppendEmpty().SetIntValue(0) addAttributes(startPoint.Attributes()) - id := "abc123" + id := uint64(12345) // ensure each run is the same by skipping the first call, which will populate caches normalizer.NormalizeHistogramDataPoint(startPoint, id) newPoint := pmetric.NewHistogramDataPoint() @@ -91,7 +91,7 @@ func BenchmarkNormalizeExopnentialHistogramDataPoint(b *testing.B) { startPoint.Negative().BucketCounts().FromRaw([]uint64{1, 1}) startPoint.Negative().SetOffset(1) addAttributes(startPoint.Attributes()) - id := "abc123" + id := uint64(12345) // ensure each run is the same by skipping the first call, which will populate caches normalizer.NormalizeExponentialHistogramDataPoint(startPoint, id) newPoint := pmetric.NewExponentialHistogramDataPoint() @@ -116,7 +116,7 @@ func BenchmarkNormalizeSummaryDataPoint(b *testing.B) { startPoint.SetSum(10.1) startPoint.QuantileValues().AppendEmpty().SetValue(1) addAttributes(startPoint.Attributes()) - id := "abc123" + id := uint64(12345) // ensure each run is the same by skipping the first call, which will populate caches normalizer.NormalizeSummaryDataPoint(startPoint, id) newPoint := pmetric.NewSummaryDataPoint() diff --git a/exporter/collector/internal/normalization/disabled_normalizer.go b/exporter/collector/internal/normalization/disabled_normalizer.go index 6d7e8e1a0..310fe30fa 100644 --- a/exporter/collector/internal/normalization/disabled_normalizer.go +++ b/exporter/collector/internal/normalization/disabled_normalizer.go @@ -34,7 +34,7 @@ func NewDisabledNormalizer() Normalizer { type disabledNormalizer struct{} // NormalizeExponentialHistogramDataPoint returns the point without normalizing. -func (d *disabledNormalizer) NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, _ string) (pmetric.ExponentialHistogramDataPoint, bool) { +func (d *disabledNormalizer) NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, _ uint64) (pmetric.ExponentialHistogramDataPoint, bool) { if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { // Handle explicit reset points. // Make a copy so we don't mutate underlying data. @@ -48,7 +48,7 @@ func (d *disabledNormalizer) NormalizeExponentialHistogramDataPoint(point pmetri } // NormalizeHistogramDataPoint returns the point without normalizing. -func (d *disabledNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, _ string) (pmetric.HistogramDataPoint, bool) { +func (d *disabledNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, _ uint64) (pmetric.HistogramDataPoint, bool) { if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { // Handle explicit reset points. // Make a copy so we don't mutate underlying data. @@ -62,7 +62,7 @@ func (d *disabledNormalizer) NormalizeHistogramDataPoint(point pmetric.Histogram } // NormalizeNumberDataPoint returns the point without normalizing. -func (d *disabledNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, _ string) (pmetric.NumberDataPoint, bool) { +func (d *disabledNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, _ uint64) (pmetric.NumberDataPoint, bool) { if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { // Handle explicit reset points. // Make a copy so we don't mutate underlying data. @@ -76,7 +76,7 @@ func (d *disabledNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPo } // NormalizeSummaryDataPoint returns the point without normalizing. -func (d *disabledNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, _ string) (pmetric.SummaryDataPoint, bool) { +func (d *disabledNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, _ uint64) (pmetric.SummaryDataPoint, bool) { if !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { // Handle explicit reset points. // Make a copy so we don't mutate underlying data. diff --git a/exporter/collector/internal/normalization/standard_normalizer.go b/exporter/collector/internal/normalization/standard_normalizer.go index 71c37c4b5..155ca194b 100644 --- a/exporter/collector/internal/normalization/standard_normalizer.go +++ b/exporter/collector/internal/normalization/standard_normalizer.go @@ -47,7 +47,7 @@ type standardNormalizer struct { log *zap.Logger } -func (s *standardNormalizer) NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, identifier string) (pmetric.ExponentialHistogramDataPoint, bool) { +func (s *standardNormalizer) NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, identifier uint64) (pmetric.ExponentialHistogramDataPoint, bool) { start, hasStart := s.startCache.GetExponentialHistogramDataPoint(identifier) if !hasStart { if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { @@ -152,7 +152,7 @@ func subtractExponentialBuckets(a, b pmetric.ExponentialHistogramDataPointBucket return newBuckets } -func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier string) (pmetric.HistogramDataPoint, bool) { +func (s *standardNormalizer) NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier uint64) (pmetric.HistogramDataPoint, bool) { start, hasStart := s.startCache.GetHistogramDataPoint(identifier) if !hasStart { if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { @@ -258,7 +258,7 @@ func bucketBoundariesEqual(a, b pcommon.Float64Slice) bool { // NormalizeNumberDataPoint normalizes a cumulative, monotonic sum. // It returns the normalized point, and true if the point should be kept. -func (s *standardNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier string) (pmetric.NumberDataPoint, bool) { +func (s *standardNormalizer) NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier uint64) (pmetric.NumberDataPoint, bool) { start, hasStart := s.startCache.GetNumberDataPoint(identifier) if !hasStart { if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { @@ -341,7 +341,7 @@ func subtractNumberDataPoint(a, b pmetric.NumberDataPoint) pmetric.NumberDataPoi return newPoint } -func (s *standardNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, identifier string) (pmetric.SummaryDataPoint, bool) { +func (s *standardNormalizer) NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, identifier uint64) (pmetric.SummaryDataPoint, bool) { start, hasStart := s.startCache.GetSummaryDataPoint(identifier) if !hasStart { if point.StartTimestamp() == 0 || !point.StartTimestamp().AsTime().Before(point.Timestamp().AsTime()) { diff --git a/exporter/collector/internal/normalization/types.go b/exporter/collector/internal/normalization/types.go index da3c9e9b1..5b1a2268f 100644 --- a/exporter/collector/internal/normalization/types.go +++ b/exporter/collector/internal/normalization/types.go @@ -22,14 +22,14 @@ import ( type Normalizer interface { // NormalizeExponentialHistogramDataPoint normalizes an exponential histogram. // It returns the normalized point, and true if the point should be kept. - NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, identifier string) (pmetric.ExponentialHistogramDataPoint, bool) + NormalizeExponentialHistogramDataPoint(point pmetric.ExponentialHistogramDataPoint, identifier uint64) (pmetric.ExponentialHistogramDataPoint, bool) // NormalizeHistogramDataPoint normalizes a cumulative histogram. // It returns the normalized point, and true if the point should be kept. - NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier string) (pmetric.HistogramDataPoint, bool) + NormalizeHistogramDataPoint(point pmetric.HistogramDataPoint, identifier uint64) (pmetric.HistogramDataPoint, bool) // NormalizeNumberDataPoint normalizes a cumulative, monotonic sum. // It returns the normalized point, and true if the point should be kept. - NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier string) (pmetric.NumberDataPoint, bool) + NormalizeNumberDataPoint(point pmetric.NumberDataPoint, identifier uint64) (pmetric.NumberDataPoint, bool) // NormalizeSummaryDataPoint normalizes a summary. // It returns the normalized point, and true if the point should be kept. - NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, identifier string) (pmetric.SummaryDataPoint, bool) + NormalizeSummaryDataPoint(point pmetric.SummaryDataPoint, identifier uint64) (pmetric.SummaryDataPoint, bool) } diff --git a/exporter/collector/metrics.go b/exporter/collector/metrics.go index f2e1abba0..a9b4b0a59 100644 --- a/exporter/collector/metrics.go +++ b/exporter/collector/metrics.go @@ -875,7 +875,11 @@ func (m *metricMapper) summaryPointToTimeSeries( return nil } // Normalize the summary point. - metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + metricIdentifier, err := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + if err != nil { + m.obs.log.Debug("Failed to get identifier for summary metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) + return nil + } normalizedPoint, keep := m.normalizer.NormalizeSummaryDataPoint(point, metricIdentifier) if !keep { return nil @@ -1148,7 +1152,11 @@ func (m *metricMapper) histogramToTimeSeries( } if hist.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { // Normalize cumulative histogram points. - metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + metricIdentifier, err := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + if err != nil { + m.obs.log.Debug("Failed to get identifier for histogram metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) + return nil + } normalizedPoint, keep := m.normalizer.NormalizeHistogramDataPoint(point, metricIdentifier) if !keep { return nil @@ -1202,7 +1210,11 @@ func (m *metricMapper) exponentialHistogramToTimeSeries( } if exponentialHist.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { // Normalize the histogram point. - metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + metricIdentifier, err := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + if err != nil { + m.obs.log.Debug("Failed to get identifier for exponential histogram metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) + return nil + } normalizedPoint, keep := m.normalizer.NormalizeExponentialHistogramDataPoint(point, metricIdentifier) if !keep { return nil @@ -1257,7 +1269,11 @@ func (m *metricMapper) sumPointToTimeSeries( } if sum.IsMonotonic() { if sum.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { - metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + metricIdentifier, err := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) + if err != nil { + m.obs.log.Debug("Failed to get identifier for sum metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) + return nil + } normalizedPoint, keep := m.normalizer.NormalizeNumberDataPoint(point, metricIdentifier) if !keep { return nil From da58aa2d2f02b386b0499c433172f4329437778c Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Thu, 29 Aug 2024 19:15:59 +0000 Subject: [PATCH 2/4] ignore error from hash function --- .../datapointstorage/datapointcache.go | 48 ++++--------------- .../datapointstorage/datapointcache_test.go | 3 +- exporter/collector/metrics.go | 24 ++-------- 3 files changed, 15 insertions(+), 60 deletions(-) diff --git a/exporter/collector/internal/datapointstorage/datapointcache.go b/exporter/collector/internal/datapointstorage/datapointcache.go index d45dc7385..f3fc70c2a 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache.go +++ b/exporter/collector/internal/datapointstorage/datapointcache.go @@ -212,57 +212,29 @@ func (c *Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) bool { } // Identifier returns the unique string identifier for a metric. -func Identifier(resource *monitoredrespb.MonitoredResource, extraLabels map[string]string, metric pmetric.Metric, attributes pcommon.Map) (uint64, error) { - var err error +func Identifier(resource *monitoredrespb.MonitoredResource, extraLabels map[string]string, metric pmetric.Metric, attributes pcommon.Map) uint64 { h := fnv.New64() - - _, err = h.Write([]byte(resource.GetType())) - if err != nil { - return 0, err - } - _, err = h.Write([]byte(metric.Name())) - if err != nil { - return 0, err - } - + h.Write([]byte(resource.GetType())) + h.Write([]byte(metric.Name())) attrs := make(map[string]string) attributes.Range(func(k string, v pcommon.Value) bool { attrs[k] = v.AsString() return true }) - - err = hashOfMap(h, extraLabels) - if err != nil { - return 0, err - } - - err = hashOfMap(h, attrs) - if err != nil { - return 0, err - } - - err = hashOfMap(h, resource.GetLabels()) - if err != nil { - return 0, err - } - return h.Sum64(), err + hashOfMap(h, extraLabels) + hashOfMap(h, attrs) + hashOfMap(h, resource.GetLabels()) + return h.Sum64() } -func hashOfMap(h hash.Hash64, m map[string]string) error { +func hashOfMap(h hash.Hash64, m map[string]string) { keys := make([]string, 0, len(m)) for k := range m { keys = append(keys, k) } sort.Strings(keys) for _, key := range keys { - _, err := h.Write([]byte(key)) - if err != nil { - return err - } - _, err = h.Write([]byte(m[key])) - if err != nil { - return err - } + h.Write([]byte(key)) + h.Write([]byte(m[key])) } - return nil } diff --git a/exporter/collector/internal/datapointstorage/datapointcache_test.go b/exporter/collector/internal/datapointstorage/datapointcache_test.go index a3d597268..5b06075ac 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache_test.go +++ b/exporter/collector/internal/datapointstorage/datapointcache_test.go @@ -327,8 +327,7 @@ func TestIdentifier(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { origLabels := pcommon.NewMap() tc.labels.CopyTo(origLabels) - got, err := Identifier(tc.resource, tc.extraLabels, tc.metric, tc.labels) - assert.NoError(t, err) + got := Identifier(tc.resource, tc.extraLabels, tc.metric, tc.labels) if tc.want != got { t.Errorf("Identifier() = %d; want %d", got, tc.want) } diff --git a/exporter/collector/metrics.go b/exporter/collector/metrics.go index a9b4b0a59..f2e1abba0 100644 --- a/exporter/collector/metrics.go +++ b/exporter/collector/metrics.go @@ -875,11 +875,7 @@ func (m *metricMapper) summaryPointToTimeSeries( return nil } // Normalize the summary point. - metricIdentifier, err := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) - if err != nil { - m.obs.log.Debug("Failed to get identifier for summary metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) - return nil - } + metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) normalizedPoint, keep := m.normalizer.NormalizeSummaryDataPoint(point, metricIdentifier) if !keep { return nil @@ -1152,11 +1148,7 @@ func (m *metricMapper) histogramToTimeSeries( } if hist.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { // Normalize cumulative histogram points. - metricIdentifier, err := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) - if err != nil { - m.obs.log.Debug("Failed to get identifier for histogram metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) - return nil - } + metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) normalizedPoint, keep := m.normalizer.NormalizeHistogramDataPoint(point, metricIdentifier) if !keep { return nil @@ -1210,11 +1202,7 @@ func (m *metricMapper) exponentialHistogramToTimeSeries( } if exponentialHist.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { // Normalize the histogram point. - metricIdentifier, err := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) - if err != nil { - m.obs.log.Debug("Failed to get identifier for exponential histogram metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) - return nil - } + metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) normalizedPoint, keep := m.normalizer.NormalizeExponentialHistogramDataPoint(point, metricIdentifier) if !keep { return nil @@ -1269,11 +1257,7 @@ func (m *metricMapper) sumPointToTimeSeries( } if sum.IsMonotonic() { if sum.AggregationTemporality() == pmetric.AggregationTemporalityCumulative { - metricIdentifier, err := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) - if err != nil { - m.obs.log.Debug("Failed to get identifier for sum metric. Dropping the metric.", zap.Error(err), zap.Any("metric", metric)) - return nil - } + metricIdentifier := datapointstorage.Identifier(resource, extraLabels, metric, point.Attributes()) normalizedPoint, keep := m.normalizer.NormalizeNumberDataPoint(point, metricIdentifier) if !keep { return nil From 60883fd69cedff7390576af580f6e23418661dbe Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 3 Sep 2024 19:31:58 +0000 Subject: [PATCH 3/4] add separators to ensure no collisions --- .../datapointstorage/datapointcache.go | 11 ++++++++++ .../datapointstorage/datapointcache_test.go | 22 ++++++++----------- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/exporter/collector/internal/datapointstorage/datapointcache.go b/exporter/collector/internal/datapointstorage/datapointcache.go index f3fc70c2a..6d8bf1d5e 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache.go +++ b/exporter/collector/internal/datapointstorage/datapointcache.go @@ -211,18 +211,27 @@ func (c *Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) bool { return true } +var ( + itemSep = []byte{'\xfe'} // Used between identifiers + KVsep = []byte{'\xff'} // Used between map keys and values +) + // Identifier returns the unique string identifier for a metric. func Identifier(resource *monitoredrespb.MonitoredResource, extraLabels map[string]string, metric pmetric.Metric, attributes pcommon.Map) uint64 { h := fnv.New64() h.Write([]byte(resource.GetType())) + h.Write(itemSep) h.Write([]byte(metric.Name())) + h.Write(itemSep) attrs := make(map[string]string) attributes.Range(func(k string, v pcommon.Value) bool { attrs[k] = v.AsString() return true }) hashOfMap(h, extraLabels) + h.Write(itemSep) hashOfMap(h, attrs) + h.Write(itemSep) hashOfMap(h, resource.GetLabels()) return h.Sum64() } @@ -235,6 +244,8 @@ func hashOfMap(h hash.Hash64, m map[string]string) { sort.Strings(keys) for _, key := range keys { h.Write([]byte(key)) + h.Write(KVsep) h.Write([]byte(m[key])) + h.Write(KVsep) } } diff --git a/exporter/collector/internal/datapointstorage/datapointcache_test.go b/exporter/collector/internal/datapointstorage/datapointcache_test.go index 5b06075ac..22415da39 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache_test.go +++ b/exporter/collector/internal/datapointstorage/datapointcache_test.go @@ -261,19 +261,15 @@ func TestIdentifier(t *testing.T) { metricWithName := pmetric.NewMetric() metricWithName.SetName("custom.googleapis.com/test.metric") dpWithAttributes := pmetric.NewNumberDataPoint() - dpWithAttributes.Attributes().PutStr("string", "strval") - dpWithAttributes.Attributes().PutBool("bool", true) - dpWithAttributes.Attributes().PutInt("int", 123) + dpWithAttributes.Attributes().PutStr("foo", "bar") monitoredResource := &monitoredrespb.MonitoredResource{ Type: "generic_task", Labels: map[string]string{ - "location": "us-central1-b", - "project": "project-foo", + "foo": "bar", }, } extraLabels := map[string]string{ - "foo": "bar", - "hello": "world", + "foo": "bar", } for _, tc := range []struct { resource *monitoredrespb.MonitoredResource @@ -285,39 +281,39 @@ func TestIdentifier(t *testing.T) { }{ { desc: "empty", - want: 14695981039346656037, + want: 16303200508382005237, metric: pmetric.NewMetric(), labels: pmetric.NewNumberDataPoint().Attributes(), }, { desc: "with name", - want: 4430695870278802542, + want: 9745949301560396956, metric: metricWithName, labels: pmetric.NewNumberDataPoint().Attributes(), }, { desc: "with attributes", - want: 12689322622784691089, + want: 2247677448538243046, metric: pmetric.NewMetric(), labels: dpWithAttributes.Attributes(), }, { desc: "with resource", - want: 10341971859444782974, + want: 4344837656395061793, resource: monitoredResource, metric: pmetric.NewMetric(), labels: pmetric.NewNumberDataPoint().Attributes(), }, { desc: "with extra labels", - want: 5179797973592387646, + want: 4054966704305404600, metric: pmetric.NewMetric(), labels: pmetric.NewNumberDataPoint().Attributes(), extraLabels: extraLabels, }, { desc: "with all", - want: 322569983349436668, + want: 2679814683341169356, metric: metricWithName, labels: dpWithAttributes.Attributes(), extraLabels: extraLabels, From 7de96ddb593c8b8c39ae85f7cfc7d4d7752cd1ae Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 4 Sep 2024 00:48:15 +0000 Subject: [PATCH 4/4] address comments --- .../collector/internal/datapointstorage/datapointcache.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/exporter/collector/internal/datapointstorage/datapointcache.go b/exporter/collector/internal/datapointstorage/datapointcache.go index 6d8bf1d5e..7b01c5789 100644 --- a/exporter/collector/internal/datapointstorage/datapointcache.go +++ b/exporter/collector/internal/datapointstorage/datapointcache.go @@ -211,9 +211,11 @@ func (c *Cache) gc(shutdown <-chan struct{}, tickerCh <-chan time.Time) bool { return true } +// Use the same constants as Prometheus uses for hashing: +// https://github.com/prometheus/prometheus/blob/282fb1632ad62a82401a230f486538a72384faf0/model/labels/labels_common.go#L32 var ( itemSep = []byte{'\xfe'} // Used between identifiers - KVsep = []byte{'\xff'} // Used between map keys and values + kvSep = []byte{'\xff'} // Used between map keys and values ) // Identifier returns the unique string identifier for a metric. @@ -244,8 +246,8 @@ func hashOfMap(h hash.Hash64, m map[string]string) { sort.Strings(keys) for _, key := range keys { h.Write([]byte(key)) - h.Write(KVsep) + h.Write(kvSep) h.Write([]byte(m[key])) - h.Write(KVsep) + h.Write(kvSep) } }