diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index 3b1df6e0f..6cd335e47 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -53,16 +53,25 @@ type profileStore struct { rowsFlushed uint64 rowGroups []*rowGroupOnDisk index *profilesIndex + + flushing *atomic.Bool + flushQueue chan int // channel to signal that a flush is needed for slice[:n] + closeOnce sync.Once + flushWg sync.WaitGroup + flushBuffer []*schemav1.Profile } func newProfileStore(phlarectx context.Context) *profileStore { s := &profileStore{ - logger: phlarecontext.Logger(phlarectx), - metrics: contextHeadMetrics(phlarectx), - persister: &schemav1.ProfilePersister{}, - helper: &profilesHelper{}, - } - + logger: phlarecontext.Logger(phlarectx), + metrics: contextHeadMetrics(phlarectx), + persister: &schemav1.ProfilePersister{}, + helper: &profilesHelper{}, + flushing: atomic.NewBool(false), + flushQueue: make(chan int), + } + s.flushWg.Add(1) + go s.cutRowGroupLoop() // Initialize writer on /dev/null // TODO: Reuse parquet.Writer beyond life time of the head. s.writer = parquet.NewGenericWriter[*schemav1.Profile](io.Discard, s.persister.Schema(), @@ -91,6 +100,10 @@ func (s *profileStore) Init(path string, cfg *ParquetConfig, metrics *headMetric if err := s.Close(); err != nil { return err } + s.flushQueue = make(chan int) + s.closeOnce = sync.Once{} + s.flushWg.Add(1) + go s.cutRowGroupLoop() // create index s.index, err = newProfileIndex(32, s.metrics) @@ -110,6 +123,13 @@ func (s *profileStore) Init(path string, cfg *ParquetConfig, metrics *headMetric } func (s *profileStore) Close() error { + if s.flushQueue != nil { + s.closeOnce.Do(func() { + close(s.flushQueue) + }) + + s.flushWg.Wait() + } return nil } @@ -121,34 +141,39 @@ func (s *profileStore) RowGroups() (rowGroups []parquet.RowGroup) { return rowGroups } -func (s *profileStore) profileSort(i, j int) bool { - // first compare the labels, if they don't match return - var ( - pI = s.slice[i] - pJ = s.slice[j] - lbsI = s.index.profilesPerFP[pI.SeriesFingerprint].lbs - lbsJ = s.index.profilesPerFP[pJ.SeriesFingerprint].lbs - ) - if cmp := phlaremodel.CompareLabelPairs(lbsI, lbsJ); cmp != 0 { - return cmp < 0 - } +func (s *profileStore) sortProfile(slice []*schemav1.Profile) { + sort.Slice(slice, func(i, j int) bool { + // first compare the labels, if they don't match return + var ( + pI = slice[i] + pJ = slice[j] + lbsI = s.index.profilesPerFP[pI.SeriesFingerprint].lbs + lbsJ = s.index.profilesPerFP[pJ.SeriesFingerprint].lbs + ) + if cmp := phlaremodel.CompareLabelPairs(lbsI, lbsJ); cmp != 0 { + return cmp < 0 + } - // then compare timenanos, if they don't match return - if pI.TimeNanos < pJ.TimeNanos { - return true - } else if pI.TimeNanos > pJ.TimeNanos { - return false - } + // then compare timenanos, if they don't match return + if pI.TimeNanos < pJ.TimeNanos { + return true + } else if pI.TimeNanos > pJ.TimeNanos { + return false + } - // finally use ID as tie breaker - return bytes.Compare(pI.ID[:], pJ.ID[:]) < 0 + // finally use ID as tie breaker + return bytes.Compare(pI.ID[:], pJ.ID[:]) < 0 + }) } // Flush writes row groups and the index to files on disk. // The call is thread-safe for reading but adding new profiles // should not be allowed during and after the call. func (s *profileStore) Flush(ctx context.Context) (numRows uint64, numRowGroups uint64, err error) { - if err = s.cutRowGroup(); err != nil { + if err := s.Close(); err != nil { + return 0, 0, err + } + if err = s.cutRowGroup(len(s.slice)); err != nil { return 0, 0, err } @@ -216,7 +241,7 @@ func (s *profileStore) prepareFile(path string) (f *os.File, err error) { // See index.cutRowGroup: we could find a way to not flush all the in-memory // profiles, including ones added since the start of the call, but only those // that were added before certain point (this call). The same for s.slice. -func (s *profileStore) cutRowGroup() (err error) { +func (s *profileStore) cutRowGroup(count int) (err error) { // if cutRowGroup fails record it as failed segment defer func() { if err != nil { @@ -224,9 +249,8 @@ func (s *profileStore) cutRowGroup() (err error) { } }() - // do nothing with empty buffer - bufferRowNums := len(s.slice) - if bufferRowNums == 0 { + size := s.loadProfilesToFlush(count) + if len(s.flushBuffer) == 0 { return nil } @@ -242,9 +266,9 @@ func (s *profileStore) cutRowGroup() (err error) { // order profiles properly // The slice is never accessed at reads, therefore we can sort it in-place. - sort.Slice(s.slice, s.profileSort) + s.sortProfile(s.flushBuffer) - n, err := s.writer.Write(s.slice) + n, err := s.writer.Write(s.flushBuffer) if err != nil { return errors.Wrap(err, "write row group segments to disk") } @@ -277,25 +301,44 @@ func (s *profileStore) cutRowGroup() (err error) { s.rowsFlushed += uint64(n) s.rowGroups = append(s.rowGroups, rowGroup) // Cutting the index is relatively quick op (no I/O). - err = s.index.cutRowGroup(s.slice) - // After the lock is released, rows/profiles should be read from the disk. - s.rowsLock.Unlock() - for i := range s.slice { + err = s.index.cutRowGroup(s.flushBuffer) + + s.profilesLock.Lock() + defer s.profilesLock.Unlock() + for i := range s.slice[:count] { // don't retain profiles and samples in memory as re-slice. s.slice[i] = nil } // reset slice and metrics - s.slice = s.slice[:0] - s.size.Store(0) + s.slice = s.slice[count:] + currentSize := s.size.Sub(size) if err != nil { return err } level.Debug(s.logger).Log("msg", "cut row group segment", "path", path, "numProfiles", n) - s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(0) + s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(float64(currentSize)) + // After the lock is released, rows/profiles should be read from the disk. + s.rowsLock.Unlock() return nil } +// loadProfilesToFlush loads profiles to flush into flushBuffer and returns the size of the profiles. +func (s *profileStore) loadProfilesToFlush(count int) uint64 { + var size uint64 + s.profilesLock.Lock() + defer s.profilesLock.Unlock() + if cap(s.flushBuffer) < count { + s.flushBuffer = make([]*schemav1.Profile, 0, count) + } + s.flushBuffer = s.flushBuffer[:0] + for i := 0; i < count; i++ { + size += s.helper.size(s.slice[i]) + s.flushBuffer = append(s.flushBuffer, s.slice[i]) + } + return size +} + func (s *profileStore) writeRowGroups(path string, rowGroups []parquet.RowGroup) (n uint64, numRowGroups uint64, err error) { fileCloser, err := s.prepareFile(path) if err != nil { @@ -340,11 +383,12 @@ func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.Profile, l defer s.profilesLock.Unlock() for pos, p := range profiles { - // check if row group is full - if s.cfg.MaxBufferRowCount > 0 && len(s.slice) >= s.cfg.MaxBufferRowCount || - s.cfg.MaxRowGroupBytes > 0 && s.size.Load() >= s.cfg.MaxRowGroupBytes { - if err := s.cutRowGroup(); err != nil { - return err + if !s.flushing.Load() { + // check if row group is full + if s.cfg.MaxBufferRowCount > 0 && len(s.slice) >= s.cfg.MaxBufferRowCount || + s.cfg.MaxRowGroupBytes > 0 && s.size.Load() >= s.cfg.MaxRowGroupBytes { + s.flushing.Store(true) + s.flushQueue <- len(s.slice) } } @@ -364,6 +408,16 @@ func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.Profile, l return nil } +func (s *profileStore) cutRowGroupLoop() { + defer s.flushWg.Done() + for n := range s.flushQueue { + if err := s.cutRowGroup(n); err != nil { + level.Error(s.logger).Log("msg", "cutting row group", "err", err) + } + s.flushing.Store(false) + } +} + type rowGroupOnDisk struct { parquet.RowGroup file *os.File diff --git a/pkg/phlaredb/profile_store_test.go b/pkg/phlaredb/profile_store_test.go index 4d81bf79f..719c015df 100644 --- a/pkg/phlaredb/profile_store_test.go +++ b/pkg/phlaredb/profile_store_test.go @@ -124,7 +124,6 @@ func nProfileStreams(n int) func(int) *testProfile { tp.populateFingerprint() return tp - } } @@ -216,6 +215,9 @@ func TestProfileStore_RowGroupSplitting(t *testing.T) { for i := 0; i < 100; i++ { p := tc.values(i) require.NoError(t, store.ingest(ctx, []*schemav1.Profile{&p.p}, p.lbls, p.profileName, emptyRewriter())) + for store.flushing.Load() { + time.Sleep(time.Millisecond) + } } // ensure the correct number of files are created @@ -323,7 +325,7 @@ func BenchmarkFlush(b *testing.B) { p.p.Samples = samples require.NoError(b, store.ingest(ctx, []*schemav1.Profile{&p.p}, p.lbls, p.profileName, rw)) } - require.NoError(b, store.cutRowGroup()) + require.NoError(b, store.cutRowGroup(len(store.slice))) } b.StartTimer() _, _, err := store.Flush(context.Background()) @@ -361,7 +363,16 @@ func TestProfileStore_Querying(t *testing.T) { head.profiles.cfg = &ParquetConfig{MaxRowGroupBytes: 128000, MaxBufferRowCount: 3} for i := 0; i < 9; i++ { - require.NoError(t, ingestThreeProfileStreams(ctx, i, head.Ingest)) + require.NoError(t, ingestThreeProfileStreams(ctx, i, func(ctx context.Context, p *profilev1.Profile, u uuid.UUID, lp ...*typesv1.LabelPair) error { + defer func() { + // wait for the profile to be flushed + // todo(cyriltovena): We shouldn't need this, but when calling head.Queriers(), flushing row group and then querying using the queriers previously returned we will miss the new headDiskQuerier. + for head.profiles.flushing.Load() { + time.Sleep(time.Millisecond) + } + }() + return head.Ingest(ctx, p, u, lp...) + })) } // now query the store @@ -372,10 +383,8 @@ func TestProfileStore_Querying(t *testing.T) { Type: mustParseProfileSelector(t, "process_cpu:cpu:nanoseconds:cpu:nanoseconds"), } - queriers := head.Queriers() - t.Run("select matching profiles", func(t *testing.T) { - pIt, err := queriers.SelectMatchingProfiles(ctx, params) + pIt, err := head.Queriers().SelectMatchingProfiles(ctx, params) require.NoError(t, err) // ensure we see the profiles we expect @@ -387,7 +396,7 @@ func TestProfileStore_Querying(t *testing.T) { }) t.Run("merge by labels", func(t *testing.T) { - client, cleanup := queriers.ingesterClient() + client, cleanup := head.Queriers().ingesterClient() defer cleanup() bidi := client.MergeProfilesLabels(ctx) @@ -453,7 +462,7 @@ func TestProfileStore_Querying(t *testing.T) { }) t.Run("merge by stacktraces", func(t *testing.T) { - client, cleanup := queriers.ingesterClient() + client, cleanup := head.Queriers().ingesterClient() defer cleanup() bidi := client.MergeProfilesStacktraces(ctx) @@ -501,7 +510,7 @@ func TestProfileStore_Querying(t *testing.T) { }) t.Run("merge by pprof", func(t *testing.T) { - client, cleanup := queriers.ingesterClient() + client, cleanup := head.Queriers().ingesterClient() defer cleanup() bidi := client.MergeProfilesPprof(ctx) diff --git a/pkg/phlaredb/profiles.go b/pkg/phlaredb/profiles.go index 29a3d21bc..ef73c046b 100644 --- a/pkg/phlaredb/profiles.go +++ b/pkg/phlaredb/profiles.go @@ -433,10 +433,12 @@ func (pi *profilesIndex) writeTo(ctx context.Context, path string) ([][]rowRange return rangesPerRG, writer.Close() } -func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error { +func (pi *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error { // adding rowGroup and rowNum information per fingerprint - rowRangePerFP := make(map[model.Fingerprint]*rowRange, len(pl.profilesPerFP)) + rowRangePerFP := make(map[model.Fingerprint]*rowRange, len(pi.profilesPerFP)) + countPerFP := make(map[model.Fingerprint]int, len(pi.profilesPerFP)) for rowNum, p := range rgProfiles { + countPerFP[p.SeriesFingerprint]++ if _, ok := rowRangePerFP[p.SeriesFingerprint]; !ok { rowRangePerFP[p.SeriesFingerprint] = &rowRange{ rowNum: int64(rowNum), @@ -452,18 +454,19 @@ func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error { } } - pl.mutex.Lock() - defer pl.mutex.Unlock() + pi.mutex.Lock() + defer pi.mutex.Unlock() - pl.rowGroupsOnDisk += 1 + pi.rowGroupsOnDisk += 1 - for _, ps := range pl.profilesPerFP { + for fp, ps := range pi.profilesPerFP { + count := countPerFP[fp] // empty all in memory profiles - for i := range ps.profiles { + for i := range ps.profiles[:count] { // Allow GC to evict the object. ps.profiles[i] = nil } - ps.profiles = ps.profiles[:0] + ps.profiles = ps.profiles[count:] // attach rowGroup and rowNum information rowRange := rowRangePerFP[ps.fp] @@ -472,6 +475,7 @@ func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error { ps.profilesOnDisk, rowRange, ) + } return nil