Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Refactor metrics in PhlareDB #574

Merged
merged 2 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/phlaredb/block_querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
"github.com/stretchr/testify/require"

Expand All @@ -20,7 +21,7 @@ func TestInMemoryReader(t *testing.T) {
MaxBufferRowCount: defaultParquetConfig.MaxBufferRowCount / 1024,
MaxRowGroupBytes: defaultParquetConfig.MaxRowGroupBytes / 1024,
MaxBlockBytes: defaultParquetConfig.MaxBlockBytes,
}))
}, newHeadMetrics(prometheus.NewRegistry())))
rewrites := &rewriter{}
rgCount := 5
for i := 0; i < rgCount*st.cfg.MaxBufferRowCount; i++ {
Expand Down
12 changes: 7 additions & 5 deletions pkg/phlaredb/deduplicating_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ type deduplicatingSlice[M Models, K comparable, H Helper[M, K], P schemav1.Persi
persister P
helper H

file *os.File
cfg *ParquetConfig
writer *parquet.GenericWriter[P]
file *os.File
cfg *ParquetConfig
metrics *headMetrics
writer *parquet.GenericWriter[P]

buffer *parquet.Buffer
rowsFlushed int
Expand All @@ -57,8 +58,9 @@ func (s *deduplicatingSlice[M, K, H, P]) Size() uint64 {
return s.size.Load()
}

func (s *deduplicatingSlice[M, K, H, P]) Init(path string, cfg *ParquetConfig) error {
func (s *deduplicatingSlice[M, K, H, P]) Init(path string, cfg *ParquetConfig, metrics *headMetrics) error {
s.cfg = cfg
s.metrics = metrics
file, err := os.OpenFile(filepath.Join(path, s.persister.Name()+block.ParquetSuffix), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644)
if err != nil {
return err
Expand Down Expand Up @@ -214,7 +216,7 @@ func (s *deduplicatingSlice[M, K, H, P]) ingest(_ context.Context, elems []M, re
posSlice++

// increase size of stored data
s.size.Add(s.helper.size(elems[pos]))
s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(float64(s.size.Add(s.helper.size(elems[pos]))))
}
s.lock.Unlock()
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ type Table interface {
Name() string
Size() uint64 // Size estimates the uncompressed byte size of the table in memory and on disk.
MemorySize() uint64 // MemorySize estimates the uncompressed byte size of the table in memory.
Init(path string, cfg *ParquetConfig) error
Init(path string, cfg *ParquetConfig, metrics *headMetrics) error
Flush(context.Context) (numRows uint64, numRowGroups uint64, err error)
Close() error
}
Expand Down Expand Up @@ -173,7 +173,6 @@ func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Hea
}
h.headPath = filepath.Join(cfg.DataPath, pathHead, h.meta.ULID.String())
h.localPath = filepath.Join(cfg.DataPath, pathLocal, h.meta.ULID.String())
h.metrics.setHead(h)

if cfg.Parquet != nil {
h.parquetConfig = cfg.Parquet
Expand All @@ -199,7 +198,7 @@ func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Hea
h.profiles,
}
for _, t := range h.tables {
if err := t.Init(h.headPath, h.parquetConfig); err != nil {
if err := t.Init(h.headPath, h.parquetConfig, h.metrics); err != nil {
return nil, err
}
}
Expand Down
92 changes: 35 additions & 57 deletions pkg/phlaredb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

phlarecontext "github.com/grafana/phlare/pkg/phlare/context"
"github.com/grafana/phlare/pkg/phlaredb/query"
"github.com/grafana/phlare/pkg/util"
)

type contextKey uint8
Expand All @@ -18,12 +18,10 @@ const (
)

type headMetrics struct {
head *Head

series prometheus.GaugeFunc
series prometheus.Gauge
seriesCreated *prometheus.CounterVec

profiles prometheus.GaugeFunc
profiles prometheus.Gauge
profilesCreated *prometheus.CounterVec

sizeBytes *prometheus.GaugeVec
Expand All @@ -35,70 +33,66 @@ type headMetrics struct {

func newHeadMetrics(reg prometheus.Registerer) *headMetrics {
m := &headMetrics{
seriesCreated: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
seriesCreated: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "phlare_tsdb_head_series_created_total",
Help: "Total number of series created in the head",
}, []string{"profile_name"}),
rowsWritten: promauto.With(reg).NewCounterVec(
rowsWritten: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "phlare_rows_written",
Help: "Number of rows written to a parquet table.",
},
[]string{"type"}),
profilesCreated: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
profilesCreated: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "phlare_head_profiles_created_total",
Help: "Total number of profiles created in the head",
}, []string{"profile_name"}),
sampleValuesIngested: promauto.With(reg).NewCounterVec(
sampleValuesIngested: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "phlare_head_ingested_sample_values_total",
Help: "Number of sample values ingested into the head per profile type.",
},
[]string{"profile_name"}),
sampleValuesReceived: promauto.With(reg).NewCounterVec(
sampleValuesReceived: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "phlare_head_received_sample_values_total",
Help: "Number of sample values received into the head per profile type.",
},
[]string{"profile_name"}),

// this metric is not registered using promauto, as it has a callback into the header
sizeBytes: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "phlare_head_size_bytes",
Help: "Size of a particular in memory store within the head phlaredb block.",
},
[]string{"type"}),
series: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "phlare_tsdb_head_series",
Help: "Total number of series in the head block.",
}),
profiles: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "phlare_head_profiles",
Help: "Total number of profiles in the head block.",
}),
}

// metrics that call into the head
m.series = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "phlare_tsdb_head_series",
Help: "Total number of series in the head block.",
}, func() float64 {
if m.head == nil {
return 0.0
}
return float64(m.head.profiles.index.totalSeries.Load())
})
m.profiles = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "phlare_head_profiles",
Help: "Total number of profiles in the head block.",
}, func() float64 {
if m.head == nil {
return 0.0
}
return float64(m.head.profiles.index.totalProfiles.Load())
})

if reg != nil {
reg.MustRegister(
m,
)
}
m.register(reg)
return m
}

func (m *headMetrics) register(reg prometheus.Registerer) {
if reg == nil {
return
}
m.series = util.RegisterOrGet(reg, m.series)
m.seriesCreated = util.RegisterOrGet(reg, m.seriesCreated)
m.profiles = util.RegisterOrGet(reg, m.profiles)
m.profilesCreated = util.RegisterOrGet(reg, m.profilesCreated)
m.sizeBytes = util.RegisterOrGet(reg, m.sizeBytes)
m.rowsWritten = util.RegisterOrGet(reg, m.rowsWritten)
m.sampleValuesIngested = util.RegisterOrGet(reg, m.sampleValuesIngested)
m.sampleValuesReceived = util.RegisterOrGet(reg, m.sampleValuesReceived)
}

func contextWithHeadMetrics(ctx context.Context, m *headMetrics) context.Context {
return context.WithValue(ctx, headMetricsContextKey, m)
}
Expand All @@ -111,38 +105,22 @@ func contextHeadMetrics(ctx context.Context) *headMetrics {
return m
}

func (m *headMetrics) setHead(head *Head) *headMetrics {
m.head = head
return m
}

func (m *headMetrics) Describe(ch chan<- *prometheus.Desc) {
m.sizeBytes.Describe(ch)
}

func (m *headMetrics) Collect(ch chan<- prometheus.Metric) {
if m.head != nil {
for _, t := range m.head.tables {
m.sizeBytes.WithLabelValues(t.Name()).Set(float64(t.MemorySize()))
}
}
m.sizeBytes.Collect(ch)
}

type blocksMetrics struct {
query *query.Metrics

blockOpeningLatency prometheus.Histogram
}

func newBlocksMetrics(reg prometheus.Registerer) *blocksMetrics {
return &blocksMetrics{
m := &blocksMetrics{
query: query.NewMetrics(reg),
blockOpeningLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
blockOpeningLatency: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "phlaredb_block_opening_duration",
Help: "Latency of opening a block in seconds",
}),
}
m.blockOpeningLatency = util.RegisterOrGet(reg, m.blockOpeningLatency)
return m
}

func contextWithBlockMetrics(ctx context.Context, m *blocksMetrics) context.Context {
Expand Down
26 changes: 26 additions & 0 deletions pkg/phlaredb/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package phlaredb

import (
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)

func TestMultipleRegistrationMetrics(t *testing.T) {
reg := prometheus.NewRegistry()
m1 := newHeadMetrics(reg)
m2 := newHeadMetrics(reg)

m1.profilesCreated.WithLabelValues("test").Inc()
m2.profilesCreated.WithLabelValues("test").Inc()

// collect metrics and compare them
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP phlare_head_profiles_created_total Total number of profiles created in the head
# TYPE phlare_head_profiles_created_total counter
phlare_head_profiles_created_total{profile_name="test"} 2
`), "phlare_head_profiles_created_total"))
}
7 changes: 4 additions & 3 deletions pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *profileStore) MemorySize() uint64 {
}

// resets the store
func (s *profileStore) Init(path string, cfg *ParquetConfig) (err error) {
func (s *profileStore) Init(path string, cfg *ParquetConfig, metrics *headMetrics) (err error) {
// close previous iteration
if err := s.Close(); err != nil {
return err
Expand All @@ -97,6 +97,7 @@ func (s *profileStore) Init(path string, cfg *ParquetConfig) (err error) {

s.path = path
s.cfg = cfg
s.metrics = metrics

s.slice = s.slice[:0]

Expand Down Expand Up @@ -256,7 +257,7 @@ func (s *profileStore) cutRowGroup() (err error) {
// reset slice and metrics
s.slice = s.slice[:0]
s.size.Store(0)

s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(0)
return nil
}

Expand Down Expand Up @@ -312,7 +313,7 @@ func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.Profile, l

// increase size of stored data
addedBytes := s.helper.size(profiles[pos])
s.size.Add(addedBytes)
s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(float64(s.size.Add(addedBytes)))
s.totalSize.Add(addedBytes)

// add to slice
Expand Down
4 changes: 2 additions & 2 deletions pkg/phlaredb/profile_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestProfileStore_RowGroupSplitting(t *testing.T) {
} {
t.Run(tc.name, func(t *testing.T) {
path := t.TempDir()
require.NoError(t, store.Init(path, tc.cfg))
require.NoError(t, store.Init(path, tc.cfg, newHeadMetrics(prometheus.NewRegistry())))

for i := 0; i < 100; i++ {
p := tc.values(i)
Expand Down Expand Up @@ -193,7 +193,7 @@ func TestProfileStore_Ingestion_SeriesIndexes(t *testing.T) {
store = newProfileStore(ctx)
)
path := t.TempDir()
require.NoError(t, store.Init(path, defaultParquetConfig))
require.NoError(t, store.Init(path, defaultParquetConfig, newHeadMetrics(prometheus.NewRegistry())))

for i := 0; i < 9; i++ {
p := threeProfileStreams(i)
Expand Down
4 changes: 2 additions & 2 deletions pkg/phlaredb/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (pi *profilesIndex) Add(ps *schemav1.Profile, lbs phlaremodel.Labels, profi
profilesOnDisk: make([]*rowRange, pi.rowGroupsOnDisk),
}
pi.profilesPerFP[ps.SeriesFingerprint] = profiles
pi.totalSeries.Inc()
pi.metrics.series.Set(float64(pi.totalSeries.Inc()))
pi.metrics.seriesCreated.WithLabelValues(profileName).Inc()
}

Expand All @@ -190,7 +190,7 @@ func (pi *profilesIndex) Add(ps *schemav1.Profile, lbs phlaremodel.Labels, profi
profiles.maxTime = ps.TimeNanos
}

pi.totalProfiles.Inc()
pi.metrics.profiles.Set(float64(pi.totalProfiles.Inc()))
pi.metrics.profilesCreated.WithLabelValues(profileName).Inc()
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/phlaredb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ import (
)

func TestQueryIndex(t *testing.T) {
head := newTestHead(t)

a, err := newProfileIndex(32, newHeadMetrics(prometheus.NewRegistry()).setHead(head.Head))
a, err := newProfileIndex(32, newHeadMetrics(prometheus.NewRegistry()))
require.NoError(t, err)

for j := 0; j < 10; j++ {
Expand Down
9 changes: 6 additions & 3 deletions pkg/phlaredb/query/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/phlare/pkg/util"
)

type contextKey uint8
Expand All @@ -18,12 +19,14 @@ type Metrics struct {
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
return &Metrics{
pageReadsTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
m := &Metrics{
pageReadsTotal: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "phlaredb_page_reads_total",
Help: "Total number of pages read while querying",
}, []string{"table", "column"}),
}
m.pageReadsTotal = util.RegisterOrGet(reg, m.pageReadsTotal)
return m
}

func AddMetricsToContext(ctx context.Context, m *Metrics) context.Context {
Expand Down
Loading