Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
Flush profiles rowgroup asynchronously (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena authored Jun 8, 2023
1 parent 782a320 commit faf62ed
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 61 deletions.
142 changes: 98 additions & 44 deletions pkg/phlaredb/profile_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,25 @@ type profileStore struct {
rowsFlushed uint64
rowGroups []*rowGroupOnDisk
index *profilesIndex

flushing *atomic.Bool
flushQueue chan int // channel to signal that a flush is needed for slice[:n]
closeOnce sync.Once
flushWg sync.WaitGroup
flushBuffer []*schemav1.Profile
}

func newProfileStore(phlarectx context.Context) *profileStore {
s := &profileStore{
logger: phlarecontext.Logger(phlarectx),
metrics: contextHeadMetrics(phlarectx),
persister: &schemav1.ProfilePersister{},
helper: &profilesHelper{},
}

logger: phlarecontext.Logger(phlarectx),
metrics: contextHeadMetrics(phlarectx),
persister: &schemav1.ProfilePersister{},
helper: &profilesHelper{},
flushing: atomic.NewBool(false),
flushQueue: make(chan int),
}
s.flushWg.Add(1)
go s.cutRowGroupLoop()
// Initialize writer on /dev/null
// TODO: Reuse parquet.Writer beyond life time of the head.
s.writer = parquet.NewGenericWriter[*schemav1.Profile](io.Discard, s.persister.Schema(),
Expand Down Expand Up @@ -91,6 +100,10 @@ func (s *profileStore) Init(path string, cfg *ParquetConfig, metrics *headMetric
if err := s.Close(); err != nil {
return err
}
s.flushQueue = make(chan int)
s.closeOnce = sync.Once{}
s.flushWg.Add(1)
go s.cutRowGroupLoop()

// create index
s.index, err = newProfileIndex(32, s.metrics)
Expand All @@ -110,6 +123,13 @@ func (s *profileStore) Init(path string, cfg *ParquetConfig, metrics *headMetric
}

func (s *profileStore) Close() error {
if s.flushQueue != nil {
s.closeOnce.Do(func() {
close(s.flushQueue)
})

s.flushWg.Wait()
}
return nil
}

Expand All @@ -121,34 +141,39 @@ func (s *profileStore) RowGroups() (rowGroups []parquet.RowGroup) {
return rowGroups
}

func (s *profileStore) profileSort(i, j int) bool {
// first compare the labels, if they don't match return
var (
pI = s.slice[i]
pJ = s.slice[j]
lbsI = s.index.profilesPerFP[pI.SeriesFingerprint].lbs
lbsJ = s.index.profilesPerFP[pJ.SeriesFingerprint].lbs
)
if cmp := phlaremodel.CompareLabelPairs(lbsI, lbsJ); cmp != 0 {
return cmp < 0
}
func (s *profileStore) sortProfile(slice []*schemav1.Profile) {
sort.Slice(slice, func(i, j int) bool {
// first compare the labels, if they don't match return
var (
pI = slice[i]
pJ = slice[j]
lbsI = s.index.profilesPerFP[pI.SeriesFingerprint].lbs
lbsJ = s.index.profilesPerFP[pJ.SeriesFingerprint].lbs
)
if cmp := phlaremodel.CompareLabelPairs(lbsI, lbsJ); cmp != 0 {
return cmp < 0
}

// then compare timenanos, if they don't match return
if pI.TimeNanos < pJ.TimeNanos {
return true
} else if pI.TimeNanos > pJ.TimeNanos {
return false
}
// then compare timenanos, if they don't match return
if pI.TimeNanos < pJ.TimeNanos {
return true
} else if pI.TimeNanos > pJ.TimeNanos {
return false
}

// finally use ID as tie breaker
return bytes.Compare(pI.ID[:], pJ.ID[:]) < 0
// finally use ID as tie breaker
return bytes.Compare(pI.ID[:], pJ.ID[:]) < 0
})
}

// Flush writes row groups and the index to files on disk.
// The call is thread-safe for reading but adding new profiles
// should not be allowed during and after the call.
func (s *profileStore) Flush(ctx context.Context) (numRows uint64, numRowGroups uint64, err error) {
if err = s.cutRowGroup(); err != nil {
if err := s.Close(); err != nil {
return 0, 0, err
}
if err = s.cutRowGroup(len(s.slice)); err != nil {
return 0, 0, err
}

Expand Down Expand Up @@ -216,17 +241,16 @@ func (s *profileStore) prepareFile(path string) (f *os.File, err error) {
// See index.cutRowGroup: we could find a way to not flush all the in-memory
// profiles, including ones added since the start of the call, but only those
// that were added before certain point (this call). The same for s.slice.
func (s *profileStore) cutRowGroup() (err error) {
func (s *profileStore) cutRowGroup(count int) (err error) {
// if cutRowGroup fails record it as failed segment
defer func() {
if err != nil {
s.metrics.writtenProfileSegments.WithLabelValues("failed").Inc()
}
}()

// do nothing with empty buffer
bufferRowNums := len(s.slice)
if bufferRowNums == 0 {
size := s.loadProfilesToFlush(count)
if len(s.flushBuffer) == 0 {
return nil
}

Expand All @@ -242,9 +266,9 @@ func (s *profileStore) cutRowGroup() (err error) {

// order profiles properly
// The slice is never accessed at reads, therefore we can sort it in-place.
sort.Slice(s.slice, s.profileSort)
s.sortProfile(s.flushBuffer)

n, err := s.writer.Write(s.slice)
n, err := s.writer.Write(s.flushBuffer)
if err != nil {
return errors.Wrap(err, "write row group segments to disk")
}
Expand Down Expand Up @@ -277,25 +301,44 @@ func (s *profileStore) cutRowGroup() (err error) {
s.rowsFlushed += uint64(n)
s.rowGroups = append(s.rowGroups, rowGroup)
// Cutting the index is relatively quick op (no I/O).
err = s.index.cutRowGroup(s.slice)
// After the lock is released, rows/profiles should be read from the disk.
s.rowsLock.Unlock()
for i := range s.slice {
err = s.index.cutRowGroup(s.flushBuffer)

s.profilesLock.Lock()
defer s.profilesLock.Unlock()
for i := range s.slice[:count] {
// don't retain profiles and samples in memory as re-slice.
s.slice[i] = nil
}
// reset slice and metrics
s.slice = s.slice[:0]
s.size.Store(0)
s.slice = s.slice[count:]
currentSize := s.size.Sub(size)
if err != nil {
return err
}

level.Debug(s.logger).Log("msg", "cut row group segment", "path", path, "numProfiles", n)
s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(0)
s.metrics.sizeBytes.WithLabelValues(s.Name()).Set(float64(currentSize))
// After the lock is released, rows/profiles should be read from the disk.
s.rowsLock.Unlock()
return nil
}

// loadProfilesToFlush loads profiles to flush into flushBuffer and returns the size of the profiles.
func (s *profileStore) loadProfilesToFlush(count int) uint64 {
var size uint64
s.profilesLock.Lock()
defer s.profilesLock.Unlock()
if cap(s.flushBuffer) < count {
s.flushBuffer = make([]*schemav1.Profile, 0, count)
}
s.flushBuffer = s.flushBuffer[:0]
for i := 0; i < count; i++ {
size += s.helper.size(s.slice[i])
s.flushBuffer = append(s.flushBuffer, s.slice[i])
}
return size
}

func (s *profileStore) writeRowGroups(path string, rowGroups []parquet.RowGroup) (n uint64, numRowGroups uint64, err error) {
fileCloser, err := s.prepareFile(path)
if err != nil {
Expand Down Expand Up @@ -340,11 +383,12 @@ func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.Profile, l
defer s.profilesLock.Unlock()

for pos, p := range profiles {
// check if row group is full
if s.cfg.MaxBufferRowCount > 0 && len(s.slice) >= s.cfg.MaxBufferRowCount ||
s.cfg.MaxRowGroupBytes > 0 && s.size.Load() >= s.cfg.MaxRowGroupBytes {
if err := s.cutRowGroup(); err != nil {
return err
if !s.flushing.Load() {
// check if row group is full
if s.cfg.MaxBufferRowCount > 0 && len(s.slice) >= s.cfg.MaxBufferRowCount ||
s.cfg.MaxRowGroupBytes > 0 && s.size.Load() >= s.cfg.MaxRowGroupBytes {
s.flushing.Store(true)
s.flushQueue <- len(s.slice)
}
}

Expand All @@ -364,6 +408,16 @@ func (s *profileStore) ingest(_ context.Context, profiles []*schemav1.Profile, l
return nil
}

func (s *profileStore) cutRowGroupLoop() {
defer s.flushWg.Done()
for n := range s.flushQueue {
if err := s.cutRowGroup(n); err != nil {
level.Error(s.logger).Log("msg", "cutting row group", "err", err)
}
s.flushing.Store(false)
}
}

type rowGroupOnDisk struct {
parquet.RowGroup
file *os.File
Expand Down
27 changes: 18 additions & 9 deletions pkg/phlaredb/profile_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func nProfileStreams(n int) func(int) *testProfile {

tp.populateFingerprint()
return tp

}
}

Expand Down Expand Up @@ -216,6 +215,9 @@ func TestProfileStore_RowGroupSplitting(t *testing.T) {
for i := 0; i < 100; i++ {
p := tc.values(i)
require.NoError(t, store.ingest(ctx, []*schemav1.Profile{&p.p}, p.lbls, p.profileName, emptyRewriter()))
for store.flushing.Load() {
time.Sleep(time.Millisecond)
}
}

// ensure the correct number of files are created
Expand Down Expand Up @@ -323,7 +325,7 @@ func BenchmarkFlush(b *testing.B) {
p.p.Samples = samples
require.NoError(b, store.ingest(ctx, []*schemav1.Profile{&p.p}, p.lbls, p.profileName, rw))
}
require.NoError(b, store.cutRowGroup())
require.NoError(b, store.cutRowGroup(len(store.slice)))
}
b.StartTimer()
_, _, err := store.Flush(context.Background())
Expand Down Expand Up @@ -361,7 +363,16 @@ func TestProfileStore_Querying(t *testing.T) {
head.profiles.cfg = &ParquetConfig{MaxRowGroupBytes: 128000, MaxBufferRowCount: 3}

for i := 0; i < 9; i++ {
require.NoError(t, ingestThreeProfileStreams(ctx, i, head.Ingest))
require.NoError(t, ingestThreeProfileStreams(ctx, i, func(ctx context.Context, p *profilev1.Profile, u uuid.UUID, lp ...*typesv1.LabelPair) error {
defer func() {
// wait for the profile to be flushed
// todo(cyriltovena): We shouldn't need this, but when calling head.Queriers(), flushing row group and then querying using the queriers previously returned we will miss the new headDiskQuerier.
for head.profiles.flushing.Load() {
time.Sleep(time.Millisecond)
}
}()
return head.Ingest(ctx, p, u, lp...)
}))
}

// now query the store
Expand All @@ -372,10 +383,8 @@ func TestProfileStore_Querying(t *testing.T) {
Type: mustParseProfileSelector(t, "process_cpu:cpu:nanoseconds:cpu:nanoseconds"),
}

queriers := head.Queriers()

t.Run("select matching profiles", func(t *testing.T) {
pIt, err := queriers.SelectMatchingProfiles(ctx, params)
pIt, err := head.Queriers().SelectMatchingProfiles(ctx, params)
require.NoError(t, err)

// ensure we see the profiles we expect
Expand All @@ -387,7 +396,7 @@ func TestProfileStore_Querying(t *testing.T) {
})

t.Run("merge by labels", func(t *testing.T) {
client, cleanup := queriers.ingesterClient()
client, cleanup := head.Queriers().ingesterClient()
defer cleanup()

bidi := client.MergeProfilesLabels(ctx)
Expand Down Expand Up @@ -453,7 +462,7 @@ func TestProfileStore_Querying(t *testing.T) {
})

t.Run("merge by stacktraces", func(t *testing.T) {
client, cleanup := queriers.ingesterClient()
client, cleanup := head.Queriers().ingesterClient()
defer cleanup()

bidi := client.MergeProfilesStacktraces(ctx)
Expand Down Expand Up @@ -501,7 +510,7 @@ func TestProfileStore_Querying(t *testing.T) {
})

t.Run("merge by pprof", func(t *testing.T) {
client, cleanup := queriers.ingesterClient()
client, cleanup := head.Queriers().ingesterClient()
defer cleanup()

bidi := client.MergeProfilesPprof(ctx)
Expand Down
20 changes: 12 additions & 8 deletions pkg/phlaredb/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,10 +433,12 @@ func (pi *profilesIndex) writeTo(ctx context.Context, path string) ([][]rowRange
return rangesPerRG, writer.Close()
}

func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error {
func (pi *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error {
// adding rowGroup and rowNum information per fingerprint
rowRangePerFP := make(map[model.Fingerprint]*rowRange, len(pl.profilesPerFP))
rowRangePerFP := make(map[model.Fingerprint]*rowRange, len(pi.profilesPerFP))
countPerFP := make(map[model.Fingerprint]int, len(pi.profilesPerFP))
for rowNum, p := range rgProfiles {
countPerFP[p.SeriesFingerprint]++
if _, ok := rowRangePerFP[p.SeriesFingerprint]; !ok {
rowRangePerFP[p.SeriesFingerprint] = &rowRange{
rowNum: int64(rowNum),
Expand All @@ -452,18 +454,19 @@ func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error {
}
}

pl.mutex.Lock()
defer pl.mutex.Unlock()
pi.mutex.Lock()
defer pi.mutex.Unlock()

pl.rowGroupsOnDisk += 1
pi.rowGroupsOnDisk += 1

for _, ps := range pl.profilesPerFP {
for fp, ps := range pi.profilesPerFP {
count := countPerFP[fp]
// empty all in memory profiles
for i := range ps.profiles {
for i := range ps.profiles[:count] {
// Allow GC to evict the object.
ps.profiles[i] = nil
}
ps.profiles = ps.profiles[:0]
ps.profiles = ps.profiles[count:]

// attach rowGroup and rowNum information
rowRange := rowRangePerFP[ps.fp]
Expand All @@ -472,6 +475,7 @@ func (pl *profilesIndex) cutRowGroup(rgProfiles []*schemav1.Profile) error {
ps.profilesOnDisk,
rowRange,
)

}

return nil
Expand Down

0 comments on commit faf62ed

Please sign in to comment.