Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Add profile segment and reason for flush metrics #608

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,13 @@ func (h *Head) loop() {
for {
select {
case <-h.flushForcedTimer.C:
h.metrics.flushedBlocksReasons.WithLabelValues("max-duration").Inc()
level.Debug(h.logger).Log("msg", "max block duration reached, flush to disk")
close(h.flushCh)
return
case <-tick.C:
if currentSize := h.Size(); currentSize > h.parquetConfig.MaxBlockBytes {
h.metrics.flushedBlocksReasons.WithLabelValues("max-block-bytes").Inc()
level.Debug(h.logger).Log(
"msg", "max block bytes reached, flush to disk",
"max_size", humanize.Bytes(h.parquetConfig.MaxBlockBytes),
Expand Down
20 changes: 20 additions & 0 deletions pkg/phlaredb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type headMetrics struct {
flusehdBlockProfiles prometheus.Histogram
blockDurationSeconds prometheus.Histogram
flushedBlocks *prometheus.CounterVec
flushedBlocksReasons *prometheus.CounterVec
writtenProfileSegments *prometheus.CounterVec
writtenProfileSegmentsBytes prometheus.Histogram
}

func newHeadMetrics(reg prometheus.Registerer) *headMetrics {
Expand Down Expand Up @@ -128,6 +131,20 @@ func newHeadMetrics(reg prometheus.Registerer) *headMetrics {
Name: "phlare_head_flushed_blocks_total",
Help: "Total number of blocks flushed.",
}, []string{"status"}),
flushedBlocksReasons: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "phlare_head_flushed_reason_total",
Help: "Total count of reasons why block has been flushed.",
}, []string{"reason"}),
writtenProfileSegments: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "phlare_head_written_profile_segments_total",
Help: "Total number and status of profile row groups segments written.",
}, []string{"status"}),
writtenProfileSegmentsBytes: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "phlare_head_written_profile_segments_size_bytes",
Help: "Size of a flushed table in bytes.",
// [512KB, 1MB, 2MB, 4MB, 8MB, 16MB, 32MB, 64MB, 128MB, 256MB, 512MB]
Buckets: prometheus.ExponentialBuckets(512*1024, 2, 11),
}),
}

m.register(reg)
Expand All @@ -154,6 +171,9 @@ func (m *headMetrics) register(reg prometheus.Registerer) {
m.flusehdBlockProfiles = util.RegisterOrGet(reg, m.flusehdBlockProfiles)
m.blockDurationSeconds = util.RegisterOrGet(reg, m.blockDurationSeconds)
m.flushedBlocks = util.RegisterOrGet(reg, m.flushedBlocks)
m.flushedBlocksReasons = util.RegisterOrGet(reg, m.flushedBlocksReasons)
m.writtenProfileSegments = util.RegisterOrGet(reg, m.writtenProfileSegments)
m.writtenProfileSegmentsBytes = util.RegisterOrGet(reg, m.writtenProfileSegmentsBytes)
}

func contextWithHeadMetrics(ctx context.Context, m *headMetrics) context.Context {
Expand Down
19 changes: 16 additions & 3 deletions pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (s *profileStore) Flush(ctx context.Context) (numRows uint64, numRowGroups
return numRows, numRowGroups, nil
}

func (s *profileStore) prepareFile(path string) (closer io.Closer, err error) {
func (s *profileStore) prepareFile(path string) (f *os.File, err error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644)
if err != nil {
return nil, err
Expand All @@ -207,6 +207,13 @@ func (s *profileStore) empty() bool {
// cutRowGroups gets called, when a patrticular row group has been finished and it will flush it to disk. The caller of cutRowGroups should be holding the write lock.
// TODO: write row groups asynchronously
func (s *profileStore) cutRowGroup() (err error) {
// if cutRowGroup fails record it as failed segment
defer func() {
if err != nil {
s.metrics.writtenProfileSegments.WithLabelValues("failed").Inc()
}
}()

// do nothing with empty buffer
bufferRowNums := len(s.slice)
if bufferRowNums == 0 {
Expand All @@ -218,7 +225,7 @@ func (s *profileStore) cutRowGroup() (err error) {
fmt.Sprintf("%s.%d%s", s.persister.Name(), s.rowsFlushed, block.ParquetSuffix),
)

fileCloser, err := s.prepareFile(path)
f, err := s.prepareFile(path)
if err != nil {
return err
}
Expand All @@ -235,9 +242,15 @@ func (s *profileStore) cutRowGroup() (err error) {
return errors.Wrap(err, "close row group segment writer")
}

if err := fileCloser.Close(); err != nil {
if err := f.Close(); err != nil {
return errors.Wrap(err, "closing row group segment file")
}
s.metrics.writtenProfileSegments.WithLabelValues("success").Inc()

// get row group segment size on disk
if stat, err := f.Stat(); err == nil {
s.metrics.writtenProfileSegmentsBytes.Observe(float64(stat.Size()))
}

s.rowsFlushed += uint64(n)

Expand Down