Skip to content

Commit

Permalink
Blooms/integration fixes (#11979)
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d authored Feb 19, 2024
1 parent ffc61fb commit 85f7baa
Show file tree
Hide file tree
Showing 19 changed files with 222 additions and 138 deletions.
14 changes: 7 additions & 7 deletions pkg/bloomcompactor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,10 @@ func (i *blockLoadingIter) loadNext() bool {
// check if there are more overlapping groups to load
if !i.overlapping.Next() {
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}
if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
}

if i.overlapping.Err() != nil {
i.err = i.overlapping.Err()
return false
}

Expand All @@ -300,7 +299,7 @@ func (i *blockLoadingIter) loadNext() bool {
filtered := v1.NewFilterIter[*bloomshipper.CloseableBlockQuerier](loader, i.filter)

iters := make([]v1.PeekingIterator[*v1.SeriesWithBloom], 0, len(blockRefs))
for filtered.Next() && filtered.Err() == nil {
for filtered.Next() {
bq := loader.At()
if _, ok := i.loaded[bq]; !ok {
i.loaded[bq] = struct{}{}
Expand All @@ -309,8 +308,9 @@ func (i *blockLoadingIter) loadNext() bool {
iters = append(iters, iter)
}

if loader.Err() != nil {
i.err = loader.Err()
if err := filtered.Err(); err != nil {
i.err = err
i.iter = v1.NewEmptyIter[*v1.SeriesWithBloom]()
return false
}

Expand Down
15 changes: 13 additions & 2 deletions pkg/bloomcompactor/bloomcompactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ func (c *Compactor) ownsTenant(tenant string) (v1.FingerprintBounds, bool, error

// runs a single round of compaction for all relevant tenants and tables
func (c *Compactor) runOne(ctx context.Context) error {
level.Info(c.logger).Log("msg", "running bloom compaction", "workers", c.cfg.WorkerParallelism)
var workersErr error
var wg sync.WaitGroup
ch := make(chan tenantTable)
Expand All @@ -226,7 +227,11 @@ func (c *Compactor) runOne(ctx context.Context) error {
err := c.loadWork(ctx, ch)

wg.Wait()
return multierror.New(workersErr, err, ctx.Err()).Err()
err = multierror.New(workersErr, err, ctx.Err()).Err()
if err != nil {
level.Error(c.logger).Log("msg", "compaction iteration failed", "err", err)
}
return err
}

func (c *Compactor) tables(ts time.Time) *dayRangeIterator {
Expand All @@ -241,6 +246,7 @@ func (c *Compactor) tables(ts time.Time) *dayRangeIterator {

fromDay := config.NewDayTime(model.TimeFromUnixNano(from))
throughDay := config.NewDayTime(model.TimeFromUnixNano(through))
level.Debug(c.logger).Log("msg", "loaded tables for compaction", "from", fromDay, "through", throughDay)
return newDayRangeIterator(fromDay, throughDay, c.schemaCfg)
}

Expand All @@ -250,6 +256,8 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
for tables.Next() && tables.Err() == nil && ctx.Err() == nil {
table := tables.At()

level.Debug(c.logger).Log("msg", "loading work for table", "table", table)

tenants, err := c.tenants(ctx, table)
if err != nil {
return errors.Wrap(err, "getting tenants")
Expand All @@ -262,6 +270,7 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
if err != nil {
return errors.Wrap(err, "checking tenant ownership")
}
level.Debug(c.logger).Log("msg", "enqueueing work for tenant", "tenant", tenant, "table", table, "ownership", ownershipRange.String(), "owns", owns)
if !owns {
c.metrics.tenantsSkipped.Inc()
continue
Expand All @@ -280,12 +289,14 @@ func (c *Compactor) loadWork(ctx context.Context, ch chan<- tenantTable) error {
}

if err := tenants.Err(); err != nil {
level.Error(c.logger).Log("msg", "error iterating tenants", "err", err)
return errors.Wrap(err, "iterating tenants")
}

}

if err := tables.Err(); err != nil {
level.Error(c.logger).Log("msg", "error iterating tables", "err", err)
return errors.Wrap(err, "iterating tables")
}

Expand Down Expand Up @@ -330,7 +341,7 @@ func (c *Compactor) runWorkers(ctx context.Context, ch <-chan tenantTable) error
}

func (c *Compactor) compactTenantTable(ctx context.Context, tt tenantTable) error {
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange)
level.Info(c.logger).Log("msg", "compacting", "org_id", tt.tenant, "table", tt.table, "ownership", tt.ownershipRange.String())
return c.controller.compactTenant(ctx, tt.table, tt.tenant, tt.ownershipRange)
}

Expand Down
148 changes: 112 additions & 36 deletions pkg/bloomcompactor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *SimpleBloomController) compactTenant(
tenant string,
ownershipRange v1.FingerprintBounds,
) error {
logger := log.With(s.logger, "ownership", ownershipRange, "org_id", tenant, "table", table.Addr())
logger := log.With(s.logger, "org_id", tenant, "table", table.Addr(), "ownership", ownershipRange.String())

client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
Expand All @@ -92,6 +92,15 @@ func (s *SimpleBloomController) compactTenant(
return errors.Wrap(err, "failed to get metas")
}

level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metas))

// fetch all metas overlapping our ownership range so we can safely
// check which metas can be deleted even if they only partially overlap out ownership range
superset, err := s.fetchSuperSet(ctx, tenant, table, ownershipRange, metas, logger)
if err != nil {
return errors.Wrap(err, "failed to fetch superset")
}

// build compaction plans
work, err := s.findOutdatedGaps(ctx, tenant, table, ownershipRange, metas, logger)
if err != nil {
Expand All @@ -104,6 +113,63 @@ func (s *SimpleBloomController) compactTenant(
return errors.Wrap(err, "failed to build gaps")
}

// combine built and superset metas
// in preparation for removing outdated ones
combined := append(superset, built...)

outdated := outdatedMetas(combined)
level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))

var (
deletedMetas int
deletedBlocks int
)
defer func() {
s.metrics.metasDeleted.Add(float64(deletedMetas))
s.metrics.blocksDeleted.Add(float64(deletedBlocks))
}()

for _, meta := range outdated {
for _, block := range meta.Blocks {
err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block.String())
} else {
level.Error(logger).Log("msg", "failed to delete block", "err", err, "block", block.String())
return errors.Wrap(err, "failed to delete block")
}
}
deletedBlocks++
level.Debug(logger).Log("msg", "removed outdated block", "block", block.String())
}

err = client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef})
if err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef.String())
} else {
level.Error(logger).Log("msg", "failed to delete meta", "err", err, "meta", meta.MetaRef.String())
return errors.Wrap(err, "failed to delete meta")
}
}
deletedMetas++
level.Debug(logger).Log("msg", "removed outdated meta", "meta", meta.MetaRef.String())
}

level.Debug(logger).Log("msg", "finished compaction")
return nil
}

// fetchSuperSet fetches all metas which overlap the ownership range of the first set of metas we've resolved
func (s *SimpleBloomController) fetchSuperSet(
ctx context.Context,
tenant string,
table config.DayTable,
ownershipRange v1.FingerprintBounds,
metas []bloomshipper.Meta,
logger log.Logger,
) ([]bloomshipper.Meta, error) {
// in order to delete outdates metas which only partially fall within the ownership range,
// we need to fetcha all metas in the entire bound range of the first set of metas we've resolved
/*
Expand All @@ -121,55 +187,49 @@ func (s *SimpleBloomController) compactTenant(
union := superset.Union(meta.Bounds)
if len(union) > 1 {
level.Error(logger).Log("msg", "meta bounds union is not a single range", "union", union)
return errors.New("meta bounds union is not a single range")
return nil, errors.New("meta bounds union is not a single range")
}
superset = union[0]
}

metas, err = s.bloomStore.FetchMetas(
within := superset.Within(ownershipRange)
level.Debug(logger).Log(
"msg", "looking for superset metas",
"superset", superset.String(),
"superset_within", within,
)

if within {
// we don't need to fetch any more metas
// NB(owen-d): here we copy metas into the output. This is slightly inefficient, but
// helps prevent mutability bugs by returning the same slice as the input.
results := make([]bloomshipper.Meta, len(metas))
copy(results, metas)
return results, nil
}

supersetMetas, err := s.bloomStore.FetchMetas(
ctx,
bloomshipper.MetaSearchParams{
TenantID: tenant,
Interval: bloomshipper.NewInterval(table.Bounds()),
Keyspace: superset,
},
)

if err != nil {
level.Error(logger).Log("msg", "failed to get meta superset range", "err", err, "superset", superset)
return errors.Wrap(err, "failed to get meta supseret range")
return nil, errors.Wrap(err, "failed to get meta supseret range")
}

// combine built and pre-existing metas
// in preparation for removing outdated metas
metas = append(metas, built...)

outdated := outdatedMetas(metas)
for _, meta := range outdated {
for _, block := range meta.Blocks {
if err := client.DeleteBlocks(ctx, []bloomshipper.BlockRef{block}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "block not found while attempting delete, continuing", "block", block)
continue
}

level.Error(logger).Log("msg", "failed to delete blocks", "err", err)
return errors.Wrap(err, "failed to delete blocks")
}
}

if err := client.DeleteMetas(ctx, []bloomshipper.MetaRef{meta.MetaRef}); err != nil {
if client.IsObjectNotFoundErr(err) {
level.Debug(logger).Log("msg", "meta not found while attempting delete, continuing", "meta", meta.MetaRef)
} else {
level.Error(logger).Log("msg", "failed to delete metas", "err", err)
return errors.Wrap(err, "failed to delete metas")
}
}
}

level.Debug(logger).Log("msg", "finished compaction")
return nil
level.Debug(logger).Log(
"msg", "found superset metas",
"metas", len(metas),
"fresh_metas", len(supersetMetas),
"delta", len(supersetMetas)-len(metas),
)

return supersetMetas, nil
}

func (s *SimpleBloomController) findOutdatedGaps(
Expand Down Expand Up @@ -271,6 +331,7 @@ func (s *SimpleBloomController) buildGaps(

for i := range plan.gaps {
gap := plan.gaps[i]
logger := log.With(logger, "gap", gap.bounds.String(), "tsdb", plan.tsdb.Name())

meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Expand Down Expand Up @@ -304,9 +365,11 @@ func (s *SimpleBloomController) buildGaps(
blocksIter,
s.rwFn,
s.metrics,
log.With(logger, "tsdb", plan.tsdb.Name(), "ownership", gap),
logger,
)

level.Debug(logger).Log("msg", "generating blocks", "overlapping_blocks", len(gap.blocks))

newBlocks := gen.Generate(ctx)
if err != nil {
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
Expand All @@ -333,6 +396,16 @@ func (s *SimpleBloomController) buildGaps(
blocksIter.Close()
return nil, errors.Wrap(err, "failed to write block")
}
s.metrics.blocksCreated.Inc()

totalGapKeyspace := (gap.bounds.Max - gap.bounds.Min)
progress := (built.Bounds.Max - gap.bounds.Min)
pct := float64(progress) / float64(totalGapKeyspace) * 100
level.Debug(logger).Log(
"msg", "uploaded block",
"block", built.BlockRef.String(),
"progress_pct", fmt.Sprintf("%.2f", pct),
)

meta.Blocks = append(meta.Blocks, built.BlockRef)
}
Expand All @@ -346,6 +419,7 @@ func (s *SimpleBloomController) buildGaps(
blocksIter.Close()

// Write the new meta
// TODO(owen-d): put total size in log, total time in metrics+log
ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks)
if err != nil {
level.Error(logger).Log("msg", "failed to checksum meta", "err", err)
Expand All @@ -357,8 +431,10 @@ func (s *SimpleBloomController) buildGaps(
level.Error(logger).Log("msg", "failed to write meta", "err", err)
return nil, errors.Wrap(err, "failed to write meta")
}
created = append(created, meta)
s.metrics.metasCreated.Inc()
level.Debug(logger).Log("msg", "uploaded meta", "meta", meta.MetaRef.String())

created = append(created, meta)
totalSeries += uint64(seriesItrWithCounter.Count())
}
}
Expand Down
Loading

0 comments on commit 85f7baa

Please sign in to comment.