Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Vertical query merging and compaction (#370)
Browse files Browse the repository at this point in the history
* Vertical series iterator

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Select overlapped blocks first in compactor Plan()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Added vertical compaction

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Code cleanup and comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix tests

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Add benchmark for compaction

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Perform vertical compaction only when blocks are overlapping.

Actions for vertical compaction:
* Sorting chunk metas
* Calling chunks.MergeOverlappingChunks on the chunks

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Benchmark for vertical compaction

* BenchmarkNormalCompaction => BenchmarkCompaction
* Moved the benchmark from db_test.go to compact_test.go

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Benchmark for query iterator and seek for non overlapping blocks

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Vertical query merge only for overlapping blocks

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Simplify logging in Compact(...)

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Updated CHANGELOG.md

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Calculate overlapping inside populateBlock

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* MinTime and MaxTime for BlockReader.

Using this to find overlapping blocks in populateBlock()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Sort blocks w.r.t. MinTime in reload()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Log about overlapping in LeveledCompactor.write() instead of returning bool

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Log about overlapping inside LeveledCompactor.populateBlock()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Refactor createBlock to take optional []Series

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* review1

Signed-off-by: Krasi Georgiev <kgeorgie@redhat.com>

* Updated CHANGELOG and minor nits

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* nits

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Updated CHANGELOG

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Refactor iterator and seek benchmarks for Querier.

Also has as overlapping blocks.

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Additional test case

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* genSeries takes optional labels. Updated BenchmarkQueryIterator and BenchmarkQuerySeek.

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Split genSeries into genSeries and populateSeries

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Check error in benchmark

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Fix review comments

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>

* Warn about overlapping blocks in reload()

Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome authored and gouthamve committed Feb 14, 2019
1 parent 89ee5aa commit c59ed49
Show file tree
Hide file tree
Showing 13 changed files with 1,158 additions and 121 deletions.
12 changes: 8 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## master / unreleased
- [ENHANCEMENT] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370)
- Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks.
- Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas.
- Added `MinTime` and `MaxTime` method for `BlockReader`.
- [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db.
- [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block.
- [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes`
Expand All @@ -7,9 +11,9 @@
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- Added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- New public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.
Expand All @@ -24,4 +28,4 @@
- [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)`
- [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field.
- [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset.
-[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
- [FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
12 changes: 12 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@ type BlockReader interface {

// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (TombstoneReader, error)

// MinTime returns the min time of the block.
MinTime() int64

// MaxTime returns the max time of the block.
MaxTime() int64
}

// Appendable defines an entity to which data can be appended.
Expand Down Expand Up @@ -363,6 +369,12 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }

// MinTime returns the min time of the meta.
func (pb *Block) MinTime() int64 { return pb.meta.MinTime }

// MaxTime returns the max time of the meta.
func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }

Expand Down
21 changes: 20 additions & 1 deletion block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
if totalSeries == 0 || labelCount == 0 {
return nil
}
series := make([]Series, totalSeries)

series := make([]Series, totalSeries)
for i := 0; i < totalSeries; i++ {
lbls := make(map[string]string, labelCount)
for len(lbls) < labelCount {
Expand All @@ -114,7 +114,26 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []Series {
}
series[i] = newSeries(lbls, samples)
}
return series
}

// populateSeries generates series from given labels, mint and maxt.
func populateSeries(lbls []map[string]string, mint, maxt int64) []Series {
if len(lbls) == 0 {
return nil
}

series := make([]Series, 0, len(lbls))
for _, lbl := range lbls {
if len(lbl) == 0 {
continue
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
for t := mint; t <= maxt; t++ {
samples = append(samples, sample{t: t, v: rand.Float64()})
}
series = append(series, newSeries(lbl, samples))
}
return series
}

Expand Down
78 changes: 78 additions & 0 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,84 @@ func (w *Writer) write(b []byte) error {
return err
}

// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The last appearing sample is retained in case there is overlapping.
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
if len(chks) < 2 {
return chks, nil
}
newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}

return newChks, nil
}

// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator()
bit := b.Iterator()
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
if ait.Err() != nil {
return nil, ait.Err()
}
if bit.Err() != nil {
return nil, bit.Err()
}
return newChunk, nil
}

func (w *Writer) WriteChunks(chks ...Meta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
Expand Down
Loading

0 comments on commit c59ed49

Please sign in to comment.