Skip to content

Commit

Permalink
refactor(blooms): Build new metas and blocks (#13074)
Browse files Browse the repository at this point in the history
  • Loading branch information
salvacorts authored Jun 4, 2024
1 parent 47f0236 commit fa2c789
Show file tree
Hide file tree
Showing 16 changed files with 1,795 additions and 77 deletions.
356 changes: 356 additions & 0 deletions pkg/bloombuild/builder/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
package builder

import (
"context"
"io"
"math"
"time"

"github.com/grafana/dskit/multierror"
"golang.org/x/exp/slices"

"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/logproto"
logql_log "github.com/grafana/loki/v3/pkg/logql/log"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
)

type Fetcher[A, B any] interface {
Fetch(ctx context.Context, inputs []A) ([]B, error)
}

type FetchFunc[A, B any] func(ctx context.Context, inputs []A) ([]B, error)

func (f FetchFunc[A, B]) Fetch(ctx context.Context, inputs []A) ([]B, error) {
return f(ctx, inputs)
}

// batchedLoader implements `v1.Iterator[C]` in batches
type batchedLoader[A, B, C any] struct {
metrics *Metrics
batchSize int
ctx context.Context
fetchers []Fetcher[A, B]
work [][]A

mapper func(B) (C, error)
cur C
batch []B
err error
}

const batchedLoaderDefaultBatchSize = 50

func newBatchedLoader[A, B, C any](
ctx context.Context,
fetchers []Fetcher[A, B],
inputs [][]A,
mapper func(B) (C, error),
batchSize int,
) *batchedLoader[A, B, C] {
return &batchedLoader[A, B, C]{
batchSize: max(batchSize, 1),
ctx: ctx,
fetchers: fetchers,
work: inputs,
mapper: mapper,
}
}

func (b *batchedLoader[A, B, C]) Next() bool {

// iterate work until we have non-zero length batch
for len(b.batch) == 0 {

// empty batch + no work remaining = we're done
if len(b.work) == 0 {
return false
}

// setup next batch
next := b.work[0]
batchSize := min(b.batchSize, len(next))
toFetch := next[:batchSize]
fetcher := b.fetchers[0]

// update work
b.work[0] = b.work[0][batchSize:]
if len(b.work[0]) == 0 {
// if we've exhausted work from this set of inputs,
// set pointer to next set of inputs
// and their respective fetcher
b.work = b.work[1:]
b.fetchers = b.fetchers[1:]
}

// there was no work in this batch; continue (should not happen)
if len(toFetch) == 0 {
continue
}

b.batch, b.err = fetcher.Fetch(b.ctx, toFetch)
// error fetching, short-circuit iteration
if b.err != nil {
return false
}
}

return b.prepNext()
}

func (b *batchedLoader[_, B, C]) prepNext() bool {
b.cur, b.err = b.mapper(b.batch[0])
b.batch = b.batch[1:]
return b.err == nil
}

func (b *batchedLoader[_, _, C]) At() C {
return b.cur
}

func (b *batchedLoader[_, _, _]) Err() error {
return b.err
}

// to ensure memory is bounded while loading chunks
// TODO(owen-d): testware
func newBatchedChunkLoader(
ctx context.Context,
fetchers []Fetcher[chunk.Chunk, chunk.Chunk],
inputs [][]chunk.Chunk,
metrics *Metrics,
batchSize int,
) *batchedLoader[chunk.Chunk, chunk.Chunk, v1.ChunkRefWithIter] {

mapper := func(c chunk.Chunk) (v1.ChunkRefWithIter, error) {
chk := c.Data.(*chunkenc.Facade).LokiChunk()
metrics.chunkSize.Observe(float64(chk.UncompressedSize()))
itr, err := chk.Iterator(
ctx,
time.Unix(0, 0),
time.Unix(0, math.MaxInt64),
logproto.FORWARD,
logql_log.NewNoopPipeline().ForStream(nil),
)

if err != nil {
return v1.ChunkRefWithIter{}, err
}

return v1.ChunkRefWithIter{
Ref: v1.ChunkRef{
From: c.From,
Through: c.Through,
Checksum: c.Checksum,
},
Itr: itr,
}, nil
}
return newBatchedLoader(ctx, fetchers, inputs, mapper, batchSize)
}

func newBatchedBlockLoader(
ctx context.Context,
fetcher Fetcher[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier],
blocks []bloomshipper.BlockRef,
batchSize int,
) *batchedLoader[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier, *bloomshipper.CloseableBlockQuerier] {

fetchers := []Fetcher[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier]{fetcher}
inputs := [][]bloomshipper.BlockRef{blocks}
mapper := func(a *bloomshipper.CloseableBlockQuerier) (*bloomshipper.CloseableBlockQuerier, error) {
return a, nil
}

return newBatchedLoader(ctx, fetchers, inputs, mapper, batchSize)
}

// compiler checks
var _ v1.Iterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.CloseableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}
var _ v1.ResettableIterator[*v1.SeriesWithBloom] = &blockLoadingIter{}

// TODO(chaudum): testware
func newBlockLoadingIter(ctx context.Context, blocks []bloomshipper.BlockRef, fetcher FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier], batchSize int) *blockLoadingIter {

return &blockLoadingIter{
ctx: ctx,
fetcher: fetcher,
inputs: blocks,
batchSize: batchSize,
loaded: make(map[io.Closer]struct{}),
}
}

type blockLoadingIter struct {
// constructor arguments
ctx context.Context
fetcher Fetcher[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier]
inputs []bloomshipper.BlockRef
overlapping v1.Iterator[[]bloomshipper.BlockRef]
batchSize int
// optional arguments
filter func(*bloomshipper.CloseableBlockQuerier) bool
// internals
initialized bool
err error
iter v1.Iterator[*v1.SeriesWithBloom]
loader *batchedLoader[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier, *bloomshipper.CloseableBlockQuerier]
loaded map[io.Closer]struct{}
}

// At implements v1.Iterator.
func (i *blockLoadingIter) At() *v1.SeriesWithBloom {
if !i.initialized {
panic("iterator not initialized")
}
return i.iter.At()
}

// Err implements v1.Iterator.
func (i *blockLoadingIter) Err() error {
if !i.initialized {
panic("iterator not initialized")
}
if i.err != nil {
return i.err
}
return i.iter.Err()
}

func (i *blockLoadingIter) init() {
if i.initialized {
return
}

// group overlapping blocks
i.overlapping = overlappingBlocksIter(i.inputs)

// set initial iter
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()

// set "match all" filter function if not present
if i.filter == nil {
i.filter = func(cbq *bloomshipper.CloseableBlockQuerier) bool { return true }
}

// done
i.initialized = true
}

// load next populates the underlying iter via relevant batches
// and returns the result of iter.Next()
func (i *blockLoadingIter) loadNext() bool {
for i.overlapping.Next() {
blockRefs := i.overlapping.At()

loader := newBatchedBlockLoader(i.ctx, i.fetcher, blockRefs, i.batchSize)
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)

iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
for filtered.Next() {
bq := filtered.At()
i.loaded[bq] = struct{}{}
iter, err := bq.SeriesIter()
if err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}
iters = append(iters, iter)
}

if err := filtered.Err(); err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}

// edge case: we've filtered out all blocks in the batch; check next batch
if len(iters) == 0 {
continue
}

// Turn the list of blocks into a single iterator that returns the next series
mergedBlocks := v1.NewHeapIterForSeriesWithBloom(iters...)
// two overlapping blocks can conceivably have the same series, so we need to dedupe,
// preferring the one with the most chunks already indexed since we'll have
// to add fewer chunks to the bloom
i.iter = v1.NewDedupingIter[*v1.SeriesWithBloom, *v1.SeriesWithBloom](
func(a, b *v1.SeriesWithBloom) bool {
return a.Series.Fingerprint == b.Series.Fingerprint
},
v1.Identity[*v1.SeriesWithBloom],
func(a, b *v1.SeriesWithBloom) *v1.SeriesWithBloom {
if len(a.Series.Chunks) > len(b.Series.Chunks) {
return a
}
return b
},
v1.NewPeekingIter(mergedBlocks),
)
return i.iter.Next()
}

i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
i.err = i.overlapping.Err()
return false
}

// Next implements v1.Iterator.
func (i *blockLoadingIter) Next() bool {
i.init()
return i.iter.Next() || i.loadNext()
}

// Close implements v1.CloseableIterator.
func (i *blockLoadingIter) Close() error {
var err multierror.MultiError
for k := range i.loaded {
err.Add(k.Close())
}
return err.Err()
}

// Reset implements v1.ResettableIterator.
// TODO(chaudum) Cache already fetched blocks to to avoid the overhead of
// creating the reader.
func (i *blockLoadingIter) Reset() error {
if !i.initialized {
return nil
}
// close loaded queriers
err := i.Close()
i.initialized = false
clear(i.loaded)
return err
}

func (i *blockLoadingIter) Filter(filter func(*bloomshipper.CloseableBlockQuerier) bool) {
if i.initialized {
panic("iterator already initialized")
}
i.filter = filter
}

func overlappingBlocksIter(inputs []bloomshipper.BlockRef) v1.Iterator[[]bloomshipper.BlockRef] {
// can we assume sorted blocks?
peekIter := v1.NewPeekingIter(v1.NewSliceIter(inputs))

return v1.NewDedupingIter[bloomshipper.BlockRef, []bloomshipper.BlockRef](
func(a bloomshipper.BlockRef, b []bloomshipper.BlockRef) bool {
minFp := b[0].Bounds.Min
maxFp := slices.MaxFunc(b, func(a, b bloomshipper.BlockRef) int { return int(a.Bounds.Max - b.Bounds.Max) }).Bounds.Max
return a.Bounds.Overlaps(v1.NewBounds(minFp, maxFp))
},
func(a bloomshipper.BlockRef) []bloomshipper.BlockRef {
return []bloomshipper.BlockRef{a}
},
func(a bloomshipper.BlockRef, b []bloomshipper.BlockRef) []bloomshipper.BlockRef {
return append(b, a)
},
peekIter,
)
}
Loading

0 comments on commit fa2c789

Please sign in to comment.