diff --git a/pkg/phlaredb/block_querier_test.go b/pkg/phlaredb/block_querier_test.go index b0a09ca2e..d8939b47f 100644 --- a/pkg/phlaredb/block_querier_test.go +++ b/pkg/phlaredb/block_querier_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "github.com/prometheus/client_golang/prometheus" "github.com/samber/lo" "github.com/stretchr/testify/require" @@ -20,7 +21,7 @@ func TestInMemoryReader(t *testing.T) { MaxBufferRowCount: defaultParquetConfig.MaxBufferRowCount / 1024, MaxRowGroupBytes: defaultParquetConfig.MaxRowGroupBytes / 1024, MaxBlockBytes: defaultParquetConfig.MaxBlockBytes, - })) + }, newHeadMetrics(prometheus.NewRegistry()))) rewrites := &rewriter{} rgCount := 5 for i := 0; i < rgCount*st.cfg.MaxBufferRowCount; i++ { diff --git a/pkg/phlaredb/deduplicating_slice.go b/pkg/phlaredb/deduplicating_slice.go index 8dfe5d8a7..4be7431c1 100644 --- a/pkg/phlaredb/deduplicating_slice.go +++ b/pkg/phlaredb/deduplicating_slice.go @@ -37,9 +37,10 @@ type deduplicatingSlice[M Models, K comparable, H Helper[M, K], P schemav1.Persi persister P helper H - file *os.File - cfg *ParquetConfig - writer *parquet.GenericWriter[P] + file *os.File + cfg *ParquetConfig + metrics *headMetrics + writer *parquet.GenericWriter[P] buffer *parquet.Buffer rowsFlushed int @@ -57,8 +58,9 @@ func (s *deduplicatingSlice[M, K, H, P]) Size() uint64 { return s.size.Load() } -func (s *deduplicatingSlice[M, K, H, P]) Init(path string, cfg *ParquetConfig) error { +func (s *deduplicatingSlice[M, K, H, P]) Init(path string, cfg *ParquetConfig, metrics *headMetrics) error { s.cfg = cfg + s.metrics = metrics file, err := os.OpenFile(filepath.Join(path, s.persister.Name()+block.ParquetSuffix), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) if err != nil { return err @@ -214,7 +216,7 @@ func (s *deduplicatingSlice[M, K, H, P]) ingest(_ context.Context, elems []M, re posSlice++ // increase size of stored data - s.size.Add(s.helper.size(elems[pos])) + s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(float64(s.size.Add(s.helper.size(elems[pos])))) } s.lock.Unlock() } diff --git a/pkg/phlaredb/head.go b/pkg/phlaredb/head.go index 5e4b2f586..75a532520 100644 --- a/pkg/phlaredb/head.go +++ b/pkg/phlaredb/head.go @@ -111,7 +111,7 @@ type Table interface { Name() string Size() uint64 // Size estimates the uncompressed byte size of the table in memory and on disk. MemorySize() uint64 // MemorySize estimates the uncompressed byte size of the table in memory. - Init(path string, cfg *ParquetConfig) error + Init(path string, cfg *ParquetConfig, metrics *headMetrics) error Flush(context.Context) (numRows uint64, numRowGroups uint64, err error) Close() error } @@ -173,7 +173,6 @@ func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Hea } h.headPath = filepath.Join(cfg.DataPath, pathHead, h.meta.ULID.String()) h.localPath = filepath.Join(cfg.DataPath, pathLocal, h.meta.ULID.String()) - h.metrics.setHead(h) if cfg.Parquet != nil { h.parquetConfig = cfg.Parquet @@ -199,7 +198,7 @@ func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Hea h.profiles, } for _, t := range h.tables { - if err := t.Init(h.headPath, h.parquetConfig); err != nil { + if err := t.Init(h.headPath, h.parquetConfig, h.metrics); err != nil { return nil, err } } diff --git a/pkg/phlaredb/metrics.go b/pkg/phlaredb/metrics.go index bcbc45463..c2471b5be 100644 --- a/pkg/phlaredb/metrics.go +++ b/pkg/phlaredb/metrics.go @@ -4,10 +4,10 @@ import ( "context" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" phlarecontext "github.com/grafana/phlare/pkg/phlare/context" "github.com/grafana/phlare/pkg/phlaredb/query" + "github.com/grafana/phlare/pkg/util" ) type contextKey uint8 @@ -18,12 +18,10 @@ const ( ) type headMetrics struct { - head *Head - - series prometheus.GaugeFunc + series prometheus.Gauge seriesCreated *prometheus.CounterVec - profiles prometheus.GaugeFunc + profiles prometheus.Gauge profilesCreated *prometheus.CounterVec sizeBytes *prometheus.GaugeVec @@ -35,70 +33,66 @@ type headMetrics struct { func newHeadMetrics(reg prometheus.Registerer) *headMetrics { m := &headMetrics{ - seriesCreated: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + seriesCreated: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "phlare_tsdb_head_series_created_total", Help: "Total number of series created in the head", }, []string{"profile_name"}), - rowsWritten: promauto.With(reg).NewCounterVec( + rowsWritten: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "phlare_rows_written", Help: "Number of rows written to a parquet table.", }, []string{"type"}), - profilesCreated: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + profilesCreated: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "phlare_head_profiles_created_total", Help: "Total number of profiles created in the head", }, []string{"profile_name"}), - sampleValuesIngested: promauto.With(reg).NewCounterVec( + sampleValuesIngested: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "phlare_head_ingested_sample_values_total", Help: "Number of sample values ingested into the head per profile type.", }, []string{"profile_name"}), - sampleValuesReceived: promauto.With(reg).NewCounterVec( + sampleValuesReceived: prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "phlare_head_received_sample_values_total", Help: "Number of sample values received into the head per profile type.", }, []string{"profile_name"}), - - // this metric is not registered using promauto, as it has a callback into the header sizeBytes: prometheus.NewGaugeVec( prometheus.GaugeOpts{ Name: "phlare_head_size_bytes", Help: "Size of a particular in memory store within the head phlaredb block.", }, []string{"type"}), + series: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "phlare_tsdb_head_series", + Help: "Total number of series in the head block.", + }), + profiles: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "phlare_head_profiles", + Help: "Total number of profiles in the head block.", + }), } - // metrics that call into the head - m.series = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ - Name: "phlare_tsdb_head_series", - Help: "Total number of series in the head block.", - }, func() float64 { - if m.head == nil { - return 0.0 - } - return float64(m.head.profiles.index.totalSeries.Load()) - }) - m.profiles = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{ - Name: "phlare_head_profiles", - Help: "Total number of profiles in the head block.", - }, func() float64 { - if m.head == nil { - return 0.0 - } - return float64(m.head.profiles.index.totalProfiles.Load()) - }) - - if reg != nil { - reg.MustRegister( - m, - ) - } + m.register(reg) return m } +func (m *headMetrics) register(reg prometheus.Registerer) { + if reg == nil { + return + } + m.series = util.RegisterOrGet(reg, m.series) + m.seriesCreated = util.RegisterOrGet(reg, m.seriesCreated) + m.profiles = util.RegisterOrGet(reg, m.profiles) + m.profilesCreated = util.RegisterOrGet(reg, m.profilesCreated) + m.sizeBytes = util.RegisterOrGet(reg, m.sizeBytes) + m.rowsWritten = util.RegisterOrGet(reg, m.rowsWritten) + m.sampleValuesIngested = util.RegisterOrGet(reg, m.sampleValuesIngested) + m.sampleValuesReceived = util.RegisterOrGet(reg, m.sampleValuesReceived) +} + func contextWithHeadMetrics(ctx context.Context, m *headMetrics) context.Context { return context.WithValue(ctx, headMetricsContextKey, m) } @@ -111,24 +105,6 @@ func contextHeadMetrics(ctx context.Context) *headMetrics { return m } -func (m *headMetrics) setHead(head *Head) *headMetrics { - m.head = head - return m -} - -func (m *headMetrics) Describe(ch chan<- *prometheus.Desc) { - m.sizeBytes.Describe(ch) -} - -func (m *headMetrics) Collect(ch chan<- prometheus.Metric) { - if m.head != nil { - for _, t := range m.head.tables { - m.sizeBytes.WithLabelValues(t.Name()).Set(float64(t.MemorySize())) - } - } - m.sizeBytes.Collect(ch) -} - type blocksMetrics struct { query *query.Metrics @@ -136,13 +112,15 @@ type blocksMetrics struct { } func newBlocksMetrics(reg prometheus.Registerer) *blocksMetrics { - return &blocksMetrics{ + m := &blocksMetrics{ query: query.NewMetrics(reg), - blockOpeningLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + blockOpeningLatency: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "phlaredb_block_opening_duration", Help: "Latency of opening a block in seconds", }), } + m.blockOpeningLatency = util.RegisterOrGet(reg, m.blockOpeningLatency) + return m } func contextWithBlockMetrics(ctx context.Context, m *blocksMetrics) context.Context { diff --git a/pkg/phlaredb/metrics_test.go b/pkg/phlaredb/metrics_test.go new file mode 100644 index 000000000..45fb92159 --- /dev/null +++ b/pkg/phlaredb/metrics_test.go @@ -0,0 +1,26 @@ +package phlaredb + +import ( + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestMultipleRegistrationMetrics(t *testing.T) { + reg := prometheus.NewRegistry() + m1 := newHeadMetrics(reg) + m2 := newHeadMetrics(reg) + + m1.profilesCreated.WithLabelValues("test").Inc() + m2.profilesCreated.WithLabelValues("test").Inc() + + // collect metrics and compare them + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(` +# HELP phlare_head_profiles_created_total Total number of profiles created in the head +# TYPE phlare_head_profiles_created_total counter +phlare_head_profiles_created_total{profile_name="test"} 2 +`), "phlare_head_profiles_created_total")) +} diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index f9ee30a48..021679dfe 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -80,7 +80,7 @@ func (s *profileStore) MemorySize() uint64 { } // resets the store -func (s *profileStore) Init(path string, cfg *ParquetConfig) (err error) { +func (s *profileStore) Init(path string, cfg *ParquetConfig, metrics *headMetrics) (err error) { // close previous iteration if err := s.Close(); err != nil { return err @@ -97,6 +97,7 @@ func (s *profileStore) Init(path string, cfg *ParquetConfig) (err error) { s.path = path s.cfg = cfg + s.metrics = metrics s.slice = s.slice[:0] @@ -256,7 +257,7 @@ func (s *profileStore) cutRowGroup() (err error) { // reset slice and metrics s.slice = s.slice[:0] s.size.Store(0) - + s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(0) return nil } @@ -312,7 +313,7 @@ func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.Profile, l // increase size of stored data addedBytes := s.helper.size(profiles[pos]) - s.size.Add(addedBytes) + s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(float64(s.size.Add(addedBytes))) s.totalSize.Add(addedBytes) // add to slice diff --git a/pkg/phlaredb/profile_store_test.go b/pkg/phlaredb/profile_store_test.go index 05b1a53ab..46adf5db7 100644 --- a/pkg/phlaredb/profile_store_test.go +++ b/pkg/phlaredb/profile_store_test.go @@ -141,7 +141,7 @@ func TestProfileStore_RowGroupSplitting(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { path := t.TempDir() - require.NoError(t, store.Init(path, tc.cfg)) + require.NoError(t, store.Init(path, tc.cfg, newHeadMetrics(prometheus.NewRegistry()))) for i := 0; i < 100; i++ { p := tc.values(i) @@ -193,7 +193,7 @@ func TestProfileStore_Ingestion_SeriesIndexes(t *testing.T) { store = newProfileStore(ctx) ) path := t.TempDir() - require.NoError(t, store.Init(path, defaultParquetConfig)) + require.NoError(t, store.Init(path, defaultParquetConfig, newHeadMetrics(prometheus.NewRegistry()))) for i := 0; i < 9; i++ { p := threeProfileStreams(i) diff --git a/pkg/phlaredb/profiles.go b/pkg/phlaredb/profiles.go index c685079f6..91b77b885 100644 --- a/pkg/phlaredb/profiles.go +++ b/pkg/phlaredb/profiles.go @@ -178,7 +178,7 @@ func (pi *profilesIndex) Add(ps *schemav1.Profile, lbs phlaremodel.Labels, profi profilesOnDisk: make([]*rowRange, pi.rowGroupsOnDisk), } pi.profilesPerFP[ps.SeriesFingerprint] = profiles - pi.totalSeries.Inc() + pi.metrics.series.Set(float64(pi.totalSeries.Inc())) pi.metrics.seriesCreated.WithLabelValues(profileName).Inc() } @@ -190,7 +190,7 @@ func (pi *profilesIndex) Add(ps *schemav1.Profile, lbs phlaremodel.Labels, profi profiles.maxTime = ps.TimeNanos } - pi.totalProfiles.Inc() + pi.metrics.profiles.Set(float64(pi.totalProfiles.Inc())) pi.metrics.profilesCreated.WithLabelValues(profileName).Inc() } diff --git a/pkg/phlaredb/querier_test.go b/pkg/phlaredb/querier_test.go index a90c418da..335964ffa 100644 --- a/pkg/phlaredb/querier_test.go +++ b/pkg/phlaredb/querier_test.go @@ -19,9 +19,7 @@ import ( ) func TestQueryIndex(t *testing.T) { - head := newTestHead(t) - - a, err := newProfileIndex(32, newHeadMetrics(prometheus.NewRegistry()).setHead(head.Head)) + a, err := newProfileIndex(32, newHeadMetrics(prometheus.NewRegistry())) require.NoError(t, err) for j := 0; j < 10; j++ { diff --git a/pkg/phlaredb/query/metrics.go b/pkg/phlaredb/query/metrics.go index f341f0794..af4fdb2f1 100644 --- a/pkg/phlaredb/query/metrics.go +++ b/pkg/phlaredb/query/metrics.go @@ -4,7 +4,8 @@ import ( "context" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/phlare/pkg/util" ) type contextKey uint8 @@ -18,12 +19,14 @@ type Metrics struct { } func NewMetrics(reg prometheus.Registerer) *Metrics { - return &Metrics{ - pageReadsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + m := &Metrics{ + pageReadsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "phlaredb_page_reads_total", Help: "Total number of pages read while querying", }, []string{"table", "column"}), } + m.pageReadsTotal = util.RegisterOrGet(reg, m.pageReadsTotal) + return m } func AddMetricsToContext(ctx context.Context, m *Metrics) context.Context { diff --git a/pkg/util/prometheus.go b/pkg/util/prometheus.go new file mode 100644 index 000000000..1143571db --- /dev/null +++ b/pkg/util/prometheus.go @@ -0,0 +1,23 @@ +package util + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +// RegisterOrGet registers the collector c with the provided registerer. +// If the registerer is nil, the collector is returned without registration. +// If the collector is already registered, the existing collector is returned. +func RegisterOrGet[T prometheus.Collector](reg prometheus.Registerer, c T) T { + if reg == nil { + return c + } + err := reg.Register(c) + if err != nil { + already, ok := err.(prometheus.AlreadyRegisteredError) + if ok { + return already.ExistingCollector.(T) + } + panic(err) + } + return c +}