Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Add profile segment and reason for flush metrics (#608)
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine authored Apr 3, 2023
1 parent 1e36409 commit 4c6b8e0
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 3 deletions.
2 changes: 2 additions & 0 deletions pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,13 @@ func (h *Head) loop() {
for {
select {
case <-h.flushForcedTimer.C:
h.metrics.flushedBlocksReasons.WithLabelValues("max-duration").Inc()
level.Debug(h.logger).Log("msg", "max block duration reached, flush to disk")
close(h.flushCh)
return
case <-tick.C:
if currentSize := h.Size(); currentSize > h.parquetConfig.MaxBlockBytes {
h.metrics.flushedBlocksReasons.WithLabelValues("max-block-bytes").Inc()
level.Debug(h.logger).Log(
"msg", "max block bytes reached, flush to disk",
"max_size", humanize.Bytes(h.parquetConfig.MaxBlockBytes),
Expand Down
20 changes: 20 additions & 0 deletions pkg/phlaredb/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ type headMetrics struct {
flusehdBlockProfiles prometheus.Histogram
blockDurationSeconds prometheus.Histogram
flushedBlocks *prometheus.CounterVec
flushedBlocksReasons *prometheus.CounterVec
writtenProfileSegments *prometheus.CounterVec
writtenProfileSegmentsBytes prometheus.Histogram
}

func newHeadMetrics(reg prometheus.Registerer) *headMetrics {
Expand Down Expand Up @@ -128,6 +131,20 @@ func newHeadMetrics(reg prometheus.Registerer) *headMetrics {
Name: "phlare_head_flushed_blocks_total",
Help: "Total number of blocks flushed.",
}, []string{"status"}),
flushedBlocksReasons: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "phlare_head_flushed_reason_total",
Help: "Total count of reasons why block has been flushed.",
}, []string{"reason"}),
writtenProfileSegments: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "phlare_head_written_profile_segments_total",
Help: "Total number and status of profile row groups segments written.",
}, []string{"status"}),
writtenProfileSegmentsBytes: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "phlare_head_written_profile_segments_size_bytes",
Help: "Size of a flushed table in bytes.",
// [512KB, 1MB, 2MB, 4MB, 8MB, 16MB, 32MB, 64MB, 128MB, 256MB, 512MB]
Buckets: prometheus.ExponentialBuckets(512*1024, 2, 11),
}),
}

m.register(reg)
Expand All @@ -154,6 +171,9 @@ func (m *headMetrics) register(reg prometheus.Registerer) {
m.flusehdBlockProfiles = util.RegisterOrGet(reg, m.flusehdBlockProfiles)
m.blockDurationSeconds = util.RegisterOrGet(reg, m.blockDurationSeconds)
m.flushedBlocks = util.RegisterOrGet(reg, m.flushedBlocks)
m.flushedBlocksReasons = util.RegisterOrGet(reg, m.flushedBlocksReasons)
m.writtenProfileSegments = util.RegisterOrGet(reg, m.writtenProfileSegments)
m.writtenProfileSegmentsBytes = util.RegisterOrGet(reg, m.writtenProfileSegmentsBytes)
}

func contextWithHeadMetrics(ctx context.Context, m *headMetrics) context.Context {
Expand Down
19 changes: 16 additions & 3 deletions pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (s *profileStore) Flush(ctx context.Context) (numRows uint64, numRowGroups
return numRows, numRowGroups, nil
}

func (s *profileStore) prepareFile(path string) (closer io.Closer, err error) {
func (s *profileStore) prepareFile(path string) (f *os.File, err error) {
file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644)
if err != nil {
return nil, err
Expand All @@ -207,6 +207,13 @@ func (s *profileStore) empty() bool {
// cutRowGroups gets called, when a patrticular row group has been finished and it will flush it to disk. The caller of cutRowGroups should be holding the write lock.
// TODO: write row groups asynchronously
func (s *profileStore) cutRowGroup() (err error) {
// if cutRowGroup fails record it as failed segment
defer func() {
if err != nil {
s.metrics.writtenProfileSegments.WithLabelValues("failed").Inc()
}
}()

// do nothing with empty buffer
bufferRowNums := len(s.slice)
if bufferRowNums == 0 {
Expand All @@ -218,7 +225,7 @@ func (s *profileStore) cutRowGroup() (err error) {
fmt.Sprintf("%s.%d%s", s.persister.Name(), s.rowsFlushed, block.ParquetSuffix),
)

fileCloser, err := s.prepareFile(path)
f, err := s.prepareFile(path)
if err != nil {
return err
}
Expand All @@ -235,9 +242,15 @@ func (s *profileStore) cutRowGroup() (err error) {
return errors.Wrap(err, "close row group segment writer")
}

if err := fileCloser.Close(); err != nil {
if err := f.Close(); err != nil {
return errors.Wrap(err, "closing row group segment file")
}
s.metrics.writtenProfileSegments.WithLabelValues("success").Inc()

// get row group segment size on disk
if stat, err := f.Stat(); err == nil {
s.metrics.writtenProfileSegmentsBytes.Observe(float64(stat.Size()))
}

s.rowsFlushed += uint64(n)

Expand Down

0 comments on commit 4c6b8e0

Please sign in to comment.