From fc12a80cdfd400960645499aa451b2291046978a Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 15 Oct 2024 02:56:32 -0700 Subject: [PATCH 01/12] go/cmd/dolt: commands/gc: Add flag handling for --full flag. --- go/cmd/dolt/cli/arg_parser_helpers.go | 1 + go/cmd/dolt/cli/flags.go | 1 + go/cmd/dolt/commands/gc.go | 19 +++++++++++++++++-- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/go/cmd/dolt/cli/arg_parser_helpers.go b/go/cmd/dolt/cli/arg_parser_helpers.go index 4057974fbf2..dba70b3c1b9 100644 --- a/go/cmd/dolt/cli/arg_parser_helpers.go +++ b/go/cmd/dolt/cli/arg_parser_helpers.go @@ -294,6 +294,7 @@ func CreateLogArgParser(isTableFunction bool) *argparser.ArgParser { func CreateGCArgParser() *argparser.ArgParser { ap := argparser.NewArgParserWithMaxArgs("gc", 0) ap.SupportsFlag(ShallowFlag, "s", "perform a fast, but incomplete garbage collection pass") + ap.SupportsFlag(FullFlag, "f", "perform a full garbage collection, including the old generation") return ap } diff --git a/go/cmd/dolt/cli/flags.go b/go/cmd/dolt/cli/flags.go index e1602e93b3a..13f72390853 100644 --- a/go/cmd/dolt/cli/flags.go +++ b/go/cmd/dolt/cli/flags.go @@ -37,6 +37,7 @@ const ( DryRunFlag = "dry-run" EmptyParam = "empty" ForceFlag = "force" + FullFlag = "full" GraphFlag = "graph" HardResetParam = "hard" HostFlag = "host" diff --git a/go/cmd/dolt/commands/gc.go b/go/cmd/dolt/commands/gc.go index c99000fbdd4..96364d01d4d 100644 --- a/go/cmd/dolt/commands/gc.go +++ b/go/cmd/dolt/commands/gc.go @@ -33,9 +33,17 @@ var gcDocs = cli.CommandDocumentationContent{ ShortDesc: "Cleans up unreferenced data from the repository.", LongDesc: `Searches the repository for data that is no longer referenced and no longer needed. -If the {{.EmphasisLeft}}--shallow{{.EmphasisRight}} flag is supplied, a faster but less thorough garbage collection will be performed.`, +Dolt GC is generational. When a GC is run, everything reachable from any commit on any branch +is put into the old generation. Data which is only reachable from uncommited branch HEADs is kept in +the new generation. By default, Dolt GC will only visit data in the new generation, and so will never +collect data from deleted branches which has previously made its way to the old generation from being +copied during a prior garbage collection. + +If the {{.EmphasisLeft}}--shallow{{.EmphasisRight}} flag is supplied, a faster but less thorough garbage collection will be performed. + +If the {{.EmphasisLeft}}--full{{.EmphasisRight}} flag is supplied, a more thorough garbage collection, fully collecting the old gen and new gen, will be performed.`, Synopsis: []string{ - "[--shallow]", + "[--shallow|--full]", }, } @@ -83,6 +91,10 @@ func (cmd GarbageCollectionCmd) Exec(ctx context.Context, commandStr string, arg help, usage := cli.HelpAndUsagePrinters(cli.CommandDocsForCommandString(commandStr, gcDocs, ap)) apr := cli.ParseArgsOrDie(ap, args, help) + if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) { + return HandleVErrAndExitCode(errhand.BuildDError("Invalid Argument: --shallow is not compatible with --full").SetPrintUsage().Build(), usage) + } + queryist, sqlCtx, closeFunc, err := cliCtx.QueryEngine(ctx) if err != nil { return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err), usage) @@ -110,6 +122,9 @@ func constructDoltGCQuery(apr *argparser.ArgParseResults) (string, error) { if apr.Contains(cli.ShallowFlag) { query += "'--shallow'" } + if apr.Contains(cli.FullFlag) { + query += "'--full'" + } query += ")" return query, nil } From c4e485f57e0b2718db3042f2adf9d96e09913fec Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 15 Oct 2024 03:59:12 -0700 Subject: [PATCH 02/12] go/libraries/doltcore/{doltdb,sqle/dprocedures}: Thread full/default mode GC through to doltdb.DDB.GC call. --- go/libraries/doltcore/doltdb/doltdb.go | 8 +++++++- go/libraries/doltcore/doltdb/gc_test.go | 4 ++-- go/libraries/doltcore/sqle/dprocedures/dolt_gc.go | 14 ++++++++++++-- 3 files changed, 21 insertions(+), 5 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index a3dfff801e6..6e2be00c87a 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1623,6 +1623,12 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error { return datas.ChunkStoreFromDatabase(ddb.db).Rebase(ctx) } +type GCMode int +const ( + GCModeDefault GCMode = iota + GCModeFull +) + // GC performs garbage collection on this ddb. // // If |safepointF| is non-nil, it will be called at some point after the GC begins @@ -1633,7 +1639,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error { // until no possibly-stale ChunkStore state is retained in memory, or failing // certain in-progress operations which cannot be finalized in a timely manner, // etc. -func (ddb *DoltDB) GC(ctx context.Context, safepointF func() error) error { +func (ddb *DoltDB) GC(ctx context.Context, mode GCMode, safepointF func() error) error { collector, ok := ddb.db.Database.(datas.GarbageCollector) if !ok { return fmt.Errorf("this database does not support garbage collection") diff --git a/go/libraries/doltcore/doltdb/gc_test.go b/go/libraries/doltcore/doltdb/gc_test.go index f99b30d0a73..6a5a09fd2a2 100644 --- a/go/libraries/doltcore/doltdb/gc_test.go +++ b/go/libraries/doltcore/doltdb/gc_test.go @@ -139,7 +139,7 @@ func testGarbageCollection(t *testing.T, test gcTest) { } } - err := dEnv.DoltDB.GC(ctx, nil) + err := dEnv.DoltDB.GC(ctx, doltdb.GCModeDefault, nil) require.NoError(t, err) test.postGCFunc(ctx, t, dEnv.DoltDB, res) @@ -208,7 +208,7 @@ func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) { _, err = ns.Write(ctx, c1.Node()) require.NoError(t, err) - err = ddb.GC(ctx, nil) + err = ddb.GC(ctx, doltdb.GCModeDefault, nil) require.NoError(t, err) c2 := newIntMap(t, ctx, ns, 2, 2) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 44ee99f7e21..92c61451cea 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -26,6 +26,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" ) @@ -72,7 +73,7 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { } if apr.NArg() != 0 { - return cmdFailure, InvalidArgErr + return cmdFailure, fmt.Errorf("cannot supply both --shallow and --full to dolt_gc: %w", InvalidArgErr) } dSess := dsess.DSessFromSess(ctx.Session) @@ -81,6 +82,10 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { return cmdFailure, fmt.Errorf("Could not load database %s", dbName) } + if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) { + return cmdFailure, InvalidArgErr + } + if apr.Contains(cli.ShallowFlag) { err = ddb.ShallowGC(ctx) if err != nil { @@ -106,10 +111,15 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { origepoch = epoch.(int) } + var mode doltdb.GCMode = doltdb.GCModeDefault + if apr.Contains(cli.FullFlag) { + mode = doltdb.GCModeFull + } + // TODO: If we got a callback at the beginning and an // (allowed-to-block) callback at the end, we could more // gracefully tear things down. - err = ddb.GC(ctx, func() error { + err = ddb.GC(ctx, mode, func() error { if origepoch != -1 { // Here we need to sanity check role and epoch. if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok { From cc47d3f257edda4d28b718804748e08aca083a72 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 15 Oct 2024 23:43:17 -0700 Subject: [PATCH 03/12] go/store/nbs: store.go: Add error checking for case where destNBS is not a *NomsBlockStore. --- go/store/nbs/store.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index e17f1e847a8..6fa6060cd85 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1592,14 +1592,18 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan return err } - destNBS := nbs + var destNBS *NomsBlockStore if dest != nil { switch typed := dest.(type) { case *NomsBlockStore: destNBS = typed case NBSMetricWrapper: destNBS = typed.nbs + default: + return fmt.Errorf("cannot MarkAndSweep into a non-NomsBlockStore ChunkStore: %w", chunks.ErrUnsupportedOperation) } + } else { + destNBS = nbs } specs, err := nbs.copyMarkedChunks(ctx, hashes, destNBS) From 38a9720481c05d8384cd9e66f3c0d76755df0208 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 16 Oct 2024 00:30:01 -0700 Subject: [PATCH 04/12] go/store/{chunks,nbs}: Move to a model where MarkAndSweepChunks returns a finalizer. Previously MarkAndSweepChunks would finalize itself by either appending the table table files to the store or by swaping to the table files. Now it returns a GCFinalizer instance which is used by the caller to effect the desired change. This is necessary for implementing Full GC, where we actually want to do both with the same GC result, but at different phases of the GC process. This commit leaves the HasManyF return from NomsBlockStore.MarkAndSweepChunks as a TODO. --- go/store/chunks/chunk_store.go | 28 +++++++++++++++++- go/store/chunks/memory_store.go | 29 +++++++++++++------ go/store/chunks/test_utils.go | 4 +-- go/store/nbs/nbs_metrics_wrapper.go | 2 +- go/store/nbs/store.go | 37 ++++++++++++++++-------- go/store/nbs/store_test.go | 6 +++- go/store/types/value_store.go | 44 +++++++++++++++++++++++------ 7 files changed, 117 insertions(+), 33 deletions(-) diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index e1bed5c74c9..17143c100e2 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -152,6 +152,32 @@ type LoggingChunkStore interface { var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block") +type HasManyF func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) + +// A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed. +// +// A GCFinalizer is a handle to one or more table files which has been +// constructed as part of the GC process. It can be used to add the table files +// to the existing store, as we do in the case of a default-mode collection +// into the old gen, and it can be used to replace all existing table files in +// the store with the new table files, as we do in the collection into the new +// gen. +// +// In addition, adding the table files to an existing store exposes a HasMany +// implementation which inspects only the table files that were added, not all +// the table files in the resulting store. This is an important part of the +// full gc protocol, which works as follows: +// +// * Collect everything reachable from old gen refs into a new table file in the old gen. +// * Add the new table file to the old gen. +// * Collect everything reachable from new gen refs into the new gen, skipping stuff that is in the new old gen table file. +// * Swap to the new gen table file. +// * Swap to the old gen table file. +type GCFinalizer interface { + AddChunksToStore(ctx context.Context) (HasManyF, error) + SwapChunksInStore(ctx context.Context) error +} + // ChunkStoreGarbageCollector is a ChunkStore that supports garbage collection. type ChunkStoreGarbageCollector interface { ChunkStore @@ -185,7 +211,7 @@ type ChunkStoreGarbageCollector interface { // This behavior is a little different for ValueStore.GC()'s // interactions with generational stores. See ValueStore and // NomsBlockStore/GenerationalNBS for details. - MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error + MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) // Count returns the number of chunks in the store. Count() (uint32, error) diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 485eafea4ba..38aa82ee024 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -343,7 +343,24 @@ func (ms *MemoryStoreView) EndGC() { ms.transitionToNoGC() } -func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error { +type msvGcFinalizer struct { + ms *MemoryStoreView + keepers map[hash.Hash]Chunk +} + +func (mgcf msvGcFinalizer) AddChunksToStore(ctx context.Context) (HasManyF, error) { + panic("unsupported") +} + +func (mgcf msvGcFinalizer) SwapChunksInStore(ctx context.Context) error { + mgcf.ms.mu.Lock() + defer mgcf.ms.mu.Unlock() + mgcf.ms.storage = &MemoryStorage{rootHash: mgcf.ms.rootHash, data: mgcf.keepers} + mgcf.ms.pending = map[hash.Hash]Chunk{} + return nil +} + +func (ms *MemoryStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) { if dest != ms { panic("unsupported") } @@ -366,20 +383,16 @@ LOOP: for _, h := range hs { c, err := ms.Get(ctx, h) if err != nil { - return err + return nil, err } keepers[h] = c } case <-ctx.Done(): - return ctx.Err() + return nil, ctx.Err() } } - ms.mu.Lock() - defer ms.mu.Unlock() - ms.storage = &MemoryStorage{rootHash: ms.rootHash, data: keepers} - ms.pending = map[hash.Hash]Chunk{} - return nil + return msvGcFinalizer{ms, keepers}, nil } func (ms *MemoryStoreView) Count() (uint32, error) { diff --git a/go/store/chunks/test_utils.go b/go/store/chunks/test_utils.go index 037210f3c7c..cf410c6afbc 100644 --- a/go/store/chunks/test_utils.go +++ b/go/store/chunks/test_utils.go @@ -91,10 +91,10 @@ func (s *TestStoreView) EndGC() { collector.EndGC() } -func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) error { +func (s *TestStoreView) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest ChunkStore) (GCFinalizer, error) { collector, ok := s.ChunkStore.(ChunkStoreGarbageCollector) if !ok || dest != s { - return ErrUnsupportedOperation + return nil, ErrUnsupportedOperation } return collector.MarkAndSweepChunks(ctx, hashes, collector) } diff --git a/go/store/nbs/nbs_metrics_wrapper.go b/go/store/nbs/nbs_metrics_wrapper.go index 78b7a75ec78..0294e992021 100644 --- a/go/store/nbs/nbs_metrics_wrapper.go +++ b/go/store/nbs/nbs_metrics_wrapper.go @@ -79,7 +79,7 @@ func (nbsMW *NBSMetricWrapper) EndGC() { nbsMW.nbs.EndGC() } -func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) error { +func (nbsMW *NBSMetricWrapper) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { return nbsMW.nbs.MarkAndSweepChunks(ctx, hashes, dest) } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 6fa6060cd85..0acb4bbe6da 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1569,10 +1569,10 @@ func (nbs *NomsBlockStore) EndGC() { nbs.cond.Broadcast() } -func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) error { +func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { ops := nbs.SupportedOperations() if !ops.CanGC || !ops.CanPrune { - return chunks.ErrUnsupportedOperation + return nil, chunks.ErrUnsupportedOperation } precheck := func() error { @@ -1589,7 +1589,7 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan } err := precheck() if err != nil { - return err + return nil, err } var destNBS *NomsBlockStore @@ -1600,7 +1600,7 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan case NBSMetricWrapper: destNBS = typed.nbs default: - return fmt.Errorf("cannot MarkAndSweep into a non-NomsBlockStore ChunkStore: %w", chunks.ErrUnsupportedOperation) + return nil, fmt.Errorf("cannot MarkAndSweep into a non-NomsBlockStore ChunkStore: %w", chunks.ErrUnsupportedOperation) } } else { destNBS = nbs @@ -1608,18 +1608,31 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan specs, err := nbs.copyMarkedChunks(ctx, hashes, destNBS) if err != nil { - return err + return nil, err } if ctx.Err() != nil { - return ctx.Err() + return nil, ctx.Err() } - if destNBS == nbs { - return nbs.swapTables(ctx, specs) - } else { - fileIdToNumChunks := tableSpecsToMap(specs) - return destNBS.AddTableFilesToManifest(ctx, fileIdToNumChunks) - } + return gcFinalizer{ + nbs: destNBS, + specs: specs, + }, nil +} + +type gcFinalizer struct { + nbs *NomsBlockStore + specs []tableSpec +} + +func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyF, error) { + // TODO: HasManyF + fileIdToNumChunks := tableSpecsToMap(gcf.specs) + return nil, gcf.nbs.AddTableFilesToManifest(ctx, fileIdToNumChunks) +} + +func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error { + return gcf.nbs.swapTables(ctx, gcf.specs) } func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, dest *NomsBlockStore) ([]tableSpec, error) { diff --git a/go/store/nbs/store_test.go b/go/store/nbs/store_test.go index dbb3ba9eb6b..56416130a29 100644 --- a/go/store/nbs/store_test.go +++ b/go/store/nbs/store_test.go @@ -339,7 +339,11 @@ func TestNBSCopyGC(t *testing.T) { wg.Add(1) go func() { require.NoError(t, st.BeginGC(nil)) - msErr = st.MarkAndSweepChunks(ctx, keepChan, nil) + var finalizer chunks.GCFinalizer + finalizer, msErr = st.MarkAndSweepChunks(ctx, keepChan, nil) + if msErr == nil { + msErr = finalizer.SwapChunksInStore(ctx) + } st.EndGC() wg.Done() }() diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index ba4b493931e..bcf1967875c 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -598,7 +598,8 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS newGenRefs.Insert(root) - err = lvs.gc(ctx, oldGenRefs, oldGen.HasMany, newGen, oldGen, nil, func() hash.HashSet { + var finalizer chunks.GCFinalizer + finalizer, err = lvs.gc(ctx, oldGenRefs, oldGen.HasMany, newGen, oldGen, nil, func() hash.HashSet { n := lvs.transitionToNewGenGC() newGenRefs.InsertAll(n) return make(hash.HashSet) @@ -608,12 +609,25 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS return err } - err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, safepointF, lvs.transitionToFinalizingGC) - newGen.EndGC() + _, err = finalizer.AddChunksToStore(ctx) if err != nil { + newGen.EndGC() + return err + } + + finalizer, err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, safepointF, lvs.transitionToFinalizingGC) + if err != nil { + newGen.EndGC() return err } + err = finalizer.SwapChunksInStore(ctx) + if err != nil { + newGen.EndGC() + return err + } + + newGen.EndGC() } else if collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector); ok { extraNewGenRefs := lvs.transitionToNewGenGC() newGenRefs.InsertAll(extraNewGenRefs) @@ -638,11 +652,20 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS newGenRefs.Insert(root) - err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC) - collector.EndGC() + var finalizer chunks.GCFinalizer + finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC) + if err != nil { + collector.EndGC() + return err + } + + err = finalizer.SwapChunksInStore(ctx) if err != nil { + collector.EndGC() return err } + + collector.EndGC() } else { return chunks.ErrUnsupportedOperation } @@ -663,12 +686,16 @@ func (lvs *ValueStore) gc(ctx context.Context, hashFilter HashFilterFunc, src, dest chunks.ChunkStoreGarbageCollector, safepointF func() error, - finalize func() hash.HashSet) error { + finalize func() hash.HashSet) (chunks.GCFinalizer, error) { keepChunks := make(chan []hash.Hash, gcBuffSize) + var gcFinalizer chunks.GCFinalizer + eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { - return src.MarkAndSweepChunks(ctx, keepChunks, dest) + var err error + gcFinalizer, err = src.MarkAndSweepChunks(ctx, keepChunks, dest) + return err }) keepHashes := func(hs []hash.Hash) error { @@ -706,7 +733,8 @@ func (lvs *ValueStore) gc(ctx context.Context, return nil }) - return eg.Wait() + err := eg.Wait() + return gcFinalizer, err } func (lvs *ValueStore) gcProcessRefs(ctx context.Context, From e37acffbb7cf09f7ff5675bcb7d208cb4d50d4fd Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 16 Oct 2024 23:54:02 -0700 Subject: [PATCH 05/12] go/store/nbs: Implement GCFinalizer HasManyF so that the table files added to the store as part of the GC can be checked for their contents. --- go/libraries/doltcore/doltdb/doltdb.go | 1 + go/store/nbs/store.go | 37 +++++++++++++++++++++++++- go/store/nbs/table_set.go | 29 ++++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 6e2be00c87a..7f5d3e365b0 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1624,6 +1624,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error { } type GCMode int + const ( GCModeDefault GCMode = iota GCModeFull diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 0acb4bbe6da..b4d103b326a 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1037,6 +1037,34 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha return nbs.hasMany(toHasRecords(hashes)) } +func (nbs *NomsBlockStore) hasManyInSources(ctx context.Context, srcs []hash.Hash, hashes hash.HashSet) (hash.HashSet, error) { + if hashes.Size() == 0 { + return nil, nil + } + + t1 := time.Now() + defer nbs.stats.HasLatency.SampleTimeSince(t1) + nbs.stats.AddressesPerHas.SampleLen(hashes.Size()) + + nbs.mu.RLock() + defer nbs.mu.RUnlock() + + records := toHasRecords(hashes) + + _, err := nbs.tables.hasManyInSources(srcs, records) + if err != nil { + return nil, err + } + + absent := hash.HashSet{} + for _, r := range records { + if !r.has { + absent.Insert(*r.a) + } + } + return absent, nil +} + func (nbs *NomsBlockStore) hasMany(reqs []hasRecord) (hash.HashSet, error) { tables, remaining, err := func() (tables chunkReader, remaining bool, err error) { tables = nbs.tables @@ -1628,7 +1656,14 @@ type gcFinalizer struct { func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyF, error) { // TODO: HasManyF fileIdToNumChunks := tableSpecsToMap(gcf.specs) - return nil, gcf.nbs.AddTableFilesToManifest(ctx, fileIdToNumChunks) + var addrs []hash.Hash + for _, spec := range gcf.specs { + addrs = append(addrs, spec.name) + } + f := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { + return gcf.nbs.hasManyInSources(ctx, addrs, hashes) + } + return f, gcf.nbs.AddTableFilesToManifest(ctx, fileIdToNumChunks) } func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error { diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index cddb4699029..e58acbe6386 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -115,6 +115,35 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { return f(ts.upstream) } +func (ts tableSet) hasManyInSources(srcs []hash.Hash, addrs []hasRecord) (remaining bool, err error) { + for _, rec := range addrs { + if !rec.has { + remaining = true + break + } + } + if !remaining { + return false, nil + } + for _, srcAddr := range srcs { + src, ok := ts.novel[srcAddr] + if !ok { + src, ok = ts.upstream[srcAddr] + if !ok { + continue + } + } + remaining, err = src.hasMany(addrs) + if err != nil { + return false, err + } + if !remaining { + break + } + } + return remaining, nil +} + func (ts tableSet) get(ctx context.Context, h hash.Hash, stats *Stats) ([]byte, error) { if err := ctx.Err(); err != nil { return nil, err From 95e2adb3e0c68f3ec6e482923b9b3015dc215d25 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 17 Oct 2024 01:37:16 -0700 Subject: [PATCH 06/12] go/store/types: Implement logic for a full GC. This does not yet work because the source ChunkStore is currently newGen. Still reworking so that the generational store itself is the source. --- go/libraries/doltcore/doltdb/doltdb.go | 11 +--- go/libraries/doltcore/doltdb/gc_test.go | 4 +- .../doltcore/sqle/dprocedures/dolt_gc.go | 6 +-- go/store/datas/database.go | 2 +- go/store/datas/database_common.go | 4 +- go/store/types/value_store.go | 52 +++++++++++++++---- go/store/types/value_store_test.go | 2 +- 7 files changed, 52 insertions(+), 29 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 7f5d3e365b0..5953db4d03d 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1623,13 +1623,6 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error { return datas.ChunkStoreFromDatabase(ddb.db).Rebase(ctx) } -type GCMode int - -const ( - GCModeDefault GCMode = iota - GCModeFull -) - // GC performs garbage collection on this ddb. // // If |safepointF| is non-nil, it will be called at some point after the GC begins @@ -1640,7 +1633,7 @@ const ( // until no possibly-stale ChunkStore state is retained in memory, or failing // certain in-progress operations which cannot be finalized in a timely manner, // etc. -func (ddb *DoltDB) GC(ctx context.Context, mode GCMode, safepointF func() error) error { +func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error { collector, ok := ddb.db.Database.(datas.GarbageCollector) if !ok { return fmt.Errorf("this database does not support garbage collection") @@ -1684,7 +1677,7 @@ func (ddb *DoltDB) GC(ctx context.Context, mode GCMode, safepointF func() error) return err } - return collector.GC(ctx, oldGen, newGen, safepointF) + return collector.GC(ctx, mode, oldGen, newGen, safepointF) } func (ddb *DoltDB) ShallowGC(ctx context.Context) error { diff --git a/go/libraries/doltcore/doltdb/gc_test.go b/go/libraries/doltcore/doltdb/gc_test.go index 6a5a09fd2a2..343c3213fbe 100644 --- a/go/libraries/doltcore/doltdb/gc_test.go +++ b/go/libraries/doltcore/doltdb/gc_test.go @@ -139,7 +139,7 @@ func testGarbageCollection(t *testing.T, test gcTest) { } } - err := dEnv.DoltDB.GC(ctx, doltdb.GCModeDefault, nil) + err := dEnv.DoltDB.GC(ctx, types.GCModeDefault, nil) require.NoError(t, err) test.postGCFunc(ctx, t, dEnv.DoltDB, res) @@ -208,7 +208,7 @@ func testGarbageCollectionHasCacheDataCorruptionBugFix(t *testing.T) { _, err = ns.Write(ctx, c1.Node()) require.NoError(t, err) - err = ddb.GC(ctx, doltdb.GCModeDefault, nil) + err = ddb.GC(ctx, types.GCModeDefault, nil) require.NoError(t, err) c2 := newIntMap(t, ctx, ns, 2, 2) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 92c61451cea..209457912da 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -26,8 +26,8 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" - "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/store/types" ) const ( @@ -111,9 +111,9 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { origepoch = epoch.(int) } - var mode doltdb.GCMode = doltdb.GCModeDefault + var mode types.GCMode = types.GCModeDefault if apr.Contains(cli.FullFlag) { - mode = doltdb.GCModeFull + mode = types.GCModeFull } // TODO: If we got a callback at the beginning and an diff --git a/go/store/datas/database.go b/go/store/datas/database.go index 56559d48a30..5b3913d0a2e 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -194,7 +194,7 @@ type GarbageCollector interface { // GC traverses the database starting at the Root and removes // all unreferenced data from persistent storage. - GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error + GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error } // CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all diff --git a/go/store/datas/database_common.go b/go/store/datas/database_common.go index 8712f1bdfed..48075f99a5d 100644 --- a/go/store/datas/database_common.go +++ b/go/store/datas/database_common.go @@ -1148,8 +1148,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse } // GC traverses the database starting at the Root and removes all unreferenced data from persistent storage. -func (db *database) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { - return db.ValueStore.GC(ctx, oldGenRefs, newGenRefs, safepointF) +func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { + return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF) } func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error { diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index bcf1967875c..d8b2fec59cc 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -36,8 +36,6 @@ import ( "github.com/dolthub/dolt/go/store/util/sizecache" ) -type HashFilterFunc func(context.Context, hash.HashSet) (hash.HashSet, error) - func unfilteredHashFunc(_ context.Context, hs hash.HashSet) (hash.HashSet, error) { return hs, nil } @@ -563,8 +561,15 @@ func makeBatches(hss []hash.HashSet, count int) [][]hash.Hash { return res } +type GCMode int + +const ( + GCModeDefault GCMode = iota + GCModeFull +) + // GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore -func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { +func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { lvs.versOnce.Do(lvs.expectVersion) lvs.transitionToOldGenGC() @@ -574,6 +579,16 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS oldGen := gcs.OldGen() newGen := gcs.NewGen() + var oldGenHasMany chunks.HasManyF + switch mode { + case GCModeDefault: + oldGenHasMany = oldGen.HasMany + case GCModeFull: + oldGenHasMany = unfilteredHashFunc + default: + return fmt.Errorf("unsupported GCMode %v", mode) + } + err := newGen.BeginGC(lvs.gcAddChunk) if err != nil { return err @@ -591,15 +606,15 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS return nil } - oldGenRefs, err = oldGen.HasMany(ctx, oldGenRefs) + oldGenRefs, err = oldGenHasMany(ctx, oldGenRefs) if err != nil { return err } newGenRefs.Insert(root) - var finalizer chunks.GCFinalizer - finalizer, err = lvs.gc(ctx, oldGenRefs, oldGen.HasMany, newGen, oldGen, nil, func() hash.HashSet { + var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer + oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, newGen, oldGen, nil, func() hash.HashSet { n := lvs.transitionToNewGenGC() newGenRefs.InsertAll(n) return make(hash.HashSet) @@ -609,19 +624,34 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS return err } - _, err = finalizer.AddChunksToStore(ctx) + var newFileHasMany chunks.HasManyF + newFileHasMany, err = oldGenFinalizer.AddChunksToStore(ctx) if err != nil { newGen.EndGC() return err } - finalizer, err = lvs.gc(ctx, newGenRefs, oldGen.HasMany, newGen, newGen, safepointF, lvs.transitionToFinalizingGC) + if mode == GCModeDefault { + oldGenHasMany = oldGen.HasMany + } else { + oldGenHasMany = newFileHasMany + } + + newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, newGen, newGen, safepointF, lvs.transitionToFinalizingGC) if err != nil { newGen.EndGC() return err } - err = finalizer.SwapChunksInStore(ctx) + if mode == GCModeFull { + err = oldGenFinalizer.SwapChunksInStore(ctx) + if err != nil { + newGen.EndGC() + return err + } + } + + err = newGenFinalizer.SwapChunksInStore(ctx) if err != nil { newGen.EndGC() return err @@ -683,7 +713,7 @@ func (lvs *ValueStore) GC(ctx context.Context, oldGenRefs, newGenRefs hash.HashS func (lvs *ValueStore) gc(ctx context.Context, toVisit hash.HashSet, - hashFilter HashFilterFunc, + hashFilter chunks.HasManyF, src, dest chunks.ChunkStoreGarbageCollector, safepointF func() error, finalize func() hash.HashSet) (chunks.GCFinalizer, error) { @@ -739,7 +769,7 @@ func (lvs *ValueStore) gc(ctx context.Context, func (lvs *ValueStore) gcProcessRefs(ctx context.Context, initialToVisit hash.HashSet, keepHashes func(hs []hash.Hash) error, - walker *parallelRefWalker, hashFilter HashFilterFunc, + walker *parallelRefWalker, hashFilter chunks.HasManyF, safepointF func() error, finalize func() hash.HashSet) error { visited := make(hash.HashSet) diff --git a/go/store/types/value_store_test.go b/go/store/types/value_store_test.go index 5f15c3f9283..d797671c04c 100644 --- a/go/store/types/value_store_test.go +++ b/go/store/types/value_store_test.go @@ -198,7 +198,7 @@ func TestGC(t *testing.T) { require.NoError(t, err) assert.NotNil(v2) - err = vs.GC(ctx, hash.HashSet{}, hash.HashSet{}, nil) + err = vs.GC(ctx, GCModeDefault, hash.HashSet{}, hash.HashSet{}, nil) require.NoError(t, err) v1, err = vs.ReadValue(ctx, h1) // non-nil From 760ec2c94bb6f21200066e67d4e96ed11518b61e Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 17 Oct 2024 02:07:21 -0700 Subject: [PATCH 07/12] go/store/nbs: Make a GenerationalNBS a ChunkStoreGarbageCollector. Change ValueStore's handling of generational chunk stores, so that it sweeps chunks from the generational store itself, and into new gen and old gen. This will support the full gc access pattern where we need to copy chunks from old gen into a new table file which will appear in old gen itself. --- go/store/nbs/generational_chunk_store.go | 38 ++++++++++++++++++++++++ go/store/nbs/store.go | 10 +++++-- go/store/types/value_store.go | 29 ++++++++++-------- 3 files changed, 61 insertions(+), 16 deletions(-) diff --git a/go/store/nbs/generational_chunk_store.go b/go/store/nbs/generational_chunk_store.go index 67b28268ad6..03942c21353 100644 --- a/go/store/nbs/generational_chunk_store.go +++ b/go/store/nbs/generational_chunk_store.go @@ -30,6 +30,8 @@ import ( var _ chunks.ChunkStore = (*GenerationalNBS)(nil) var _ chunks.GenerationalCS = (*GenerationalNBS)(nil) var _ chunks.TableFileStore = (*GenerationalNBS)(nil) +var _ chunks.GenerationalCS = (*GenerationalNBS)(nil) +var _ chunks.ChunkStoreGarbageCollector = (*GenerationalNBS)(nil) type GenerationalNBS struct { oldGen *NomsBlockStore @@ -492,3 +494,39 @@ func (gcs *GenerationalNBS) Path() (string, bool) { func (gcs *GenerationalNBS) UpdateManifest(ctx context.Context, updates map[hash.Hash]uint32) (mi ManifestInfo, err error) { return gcs.newGen.UpdateManifest(ctx, updates) } + +func (gcs *GenerationalNBS) BeginGC(keeper func(hash.Hash) bool) error { + return gcs.newGen.BeginGC(keeper) +} + +func (gcs *GenerationalNBS) EndGC() { + gcs.newGen.EndGC() +} + +func (gcs *GenerationalNBS) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { + return markAndSweepChunks(ctx, hashes, gcs.newGen, gcs, dest) +} + +func (gcs *GenerationalNBS) IterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk)) error { + err := gcs.newGen.IterateAllChunks(ctx, cb) + if err != nil { + return err + } + err = gcs.oldGen.IterateAllChunks(ctx, cb) + if err != nil { + return err + } + return nil +} + +func (gcs *GenerationalNBS) Count() (uint32, error) { + newGenCnt, err := gcs.newGen.Count() + if err != nil { + return 0, err + } + oldGenCnt, err := gcs.oldGen.Count() + if err != nil { + return 0, err + } + return newGenCnt + oldGenCnt, nil +} diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index b4d103b326a..76715710bd9 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1598,6 +1598,10 @@ func (nbs *NomsBlockStore) EndGC() { } func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { + return markAndSweepChunks(ctx, hashes, nbs, nbs, dest) +} + +func markAndSweepChunks(ctx context.Context, hashes <-chan []hash.Hash, nbs *NomsBlockStore, src NBSCompressedChunkStore, dest chunks.ChunkStore) (chunks.GCFinalizer, error) { ops := nbs.SupportedOperations() if !ops.CanGC || !ops.CanPrune { return nil, chunks.ErrUnsupportedOperation @@ -1634,7 +1638,7 @@ func (nbs *NomsBlockStore) MarkAndSweepChunks(ctx context.Context, hashes <-chan destNBS = nbs } - specs, err := nbs.copyMarkedChunks(ctx, hashes, destNBS) + specs, err := copyMarkedChunks(ctx, hashes, src, destNBS) if err != nil { return nil, err } @@ -1670,7 +1674,7 @@ func (gcf gcFinalizer) SwapChunksInStore(ctx context.Context) error { return gcf.nbs.swapTables(ctx, gcf.specs) } -func (nbs *NomsBlockStore) copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, dest *NomsBlockStore) ([]tableSpec, error) { +func copyMarkedChunks(ctx context.Context, keepChunks <-chan []hash.Hash, src NBSCompressedChunkStore, dest *NomsBlockStore) ([]tableSpec, error) { tfp, ok := dest.p.(tableFilePersister) if !ok { return nil, fmt.Errorf("NBS does not support copying garbage collection") @@ -1694,7 +1698,7 @@ LOOP: mu := new(sync.Mutex) hashset := hash.NewHashSet(hs...) found := 0 - err := nbs.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) { + err := src.GetManyCompressed(ctx, hashset, func(ctx context.Context, c CompressedChunk) { mu.Lock() defer mu.Unlock() if addErr != nil { diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index d8b2fec59cc..47dbf6d20e5 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -575,7 +575,10 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe lvs.transitionToOldGenGC() defer lvs.transitionToNoGC() - if gcs, ok := lvs.cs.(chunks.GenerationalCS); ok { + gcs, gcsOK := lvs.cs.(chunks.GenerationalCS) + collector, collectorOK := lvs.cs.(chunks.ChunkStoreGarbageCollector) + + if gcsOK && collectorOK { oldGen := gcs.OldGen() newGen := gcs.NewGen() @@ -589,20 +592,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe return fmt.Errorf("unsupported GCMode %v", mode) } - err := newGen.BeginGC(lvs.gcAddChunk) + err := collector.BeginGC(lvs.gcAddChunk) if err != nil { return err } root, err := lvs.Root(ctx) if err != nil { - newGen.EndGC() + collector.EndGC() return err } if root == (hash.Hash{}) { // empty root - newGen.EndGC() + collector.EndGC() return nil } @@ -614,20 +617,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe newGenRefs.Insert(root) var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer - oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, newGen, oldGen, nil, func() hash.HashSet { + oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, collector, oldGen, nil, func() hash.HashSet { n := lvs.transitionToNewGenGC() newGenRefs.InsertAll(n) return make(hash.HashSet) }) if err != nil { - newGen.EndGC() + collector.EndGC() return err } var newFileHasMany chunks.HasManyF newFileHasMany, err = oldGenFinalizer.AddChunksToStore(ctx) if err != nil { - newGen.EndGC() + collector.EndGC() return err } @@ -637,28 +640,28 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe oldGenHasMany = newFileHasMany } - newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, newGen, newGen, safepointF, lvs.transitionToFinalizingGC) + newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, collector, newGen, safepointF, lvs.transitionToFinalizingGC) if err != nil { - newGen.EndGC() + collector.EndGC() return err } if mode == GCModeFull { err = oldGenFinalizer.SwapChunksInStore(ctx) if err != nil { - newGen.EndGC() + collector.EndGC() return err } } err = newGenFinalizer.SwapChunksInStore(ctx) if err != nil { - newGen.EndGC() + collector.EndGC() return err } - newGen.EndGC() - } else if collector, ok := lvs.cs.(chunks.ChunkStoreGarbageCollector); ok { + collector.EndGC() + } else if collectorOK { extraNewGenRefs := lvs.transitionToNewGenGC() newGenRefs.InsertAll(extraNewGenRefs) newGenRefs.InsertAll(oldGenRefs) From 0905d4842734c293cd27415f58ca3c285e0b74b4 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 17 Oct 2024 02:51:03 -0700 Subject: [PATCH 08/12] integration-tests/bats: garbage_collection.bats: Add a test asserting that dolt gc --full removes data from old gen which is now unreachable. --- .../bats/garbage_collection.bats | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/integration-tests/bats/garbage_collection.bats b/integration-tests/bats/garbage_collection.bats index a9b9cdd63e0..321ce6ac74c 100644 --- a/integration-tests/bats/garbage_collection.bats +++ b/integration-tests/bats/garbage_collection.bats @@ -404,3 +404,75 @@ SQL echo "$AFTER" [ "$BEFORE" -gt "$AFTER" ] } + +@test "garbage_collection: dolt gc --full" { + # Create a lot of data on a new branch. + dolt checkout -b to_keep + dolt sql -q "CREATE TABLE vals (val LONGTEXT);" + str="hex(random_bytes(1024))" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + + dolt commit -Am 'create some data on a new commit.' + + # Create a lot of data on another new branch. + dolt checkout main + dolt checkout -b to_delete + dolt sql -q "CREATE TABLE vals (val LONGTEXT);" + str="hex(random_bytes(1024))" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + str="$str,$str" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + dolt sql -q "INSERT INTO vals VALUES (concat($str));" + + dolt commit -Am 'create some data on a new commit.' + + # GC it into the old gen. + dolt gc + + # Get repository size. Note, this is in 512 byte blocks. + BEFORE=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') + + # Delete the branch with all the data. + dolt checkout main + dolt branch -D to_delete + + # Check that a regular GC does not delete this data. + dolt gc + AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') + [ $(($BEFORE - $AFTER)) -lt 16 ] + + # Check that a full GC does delete this data. + # NOTE: We create and drop the tmp table here to get around Dolt's "GC is + # a no-op if there have been no writes since the last GC" check. + dolt sql -q 'create table tmp (id int); drop table tmp;' + dolt gc --full + AFTER=$(du -c .dolt/noms/ | grep total | sed 's/[^0-9]*//g') + [ $(($BEFORE - $AFTER)) -gt 8192 ] # Reclaim at least 4MBs, in 512-byte blocks. + + # Sanity check that the stuff on to_keep is still accessible. + dolt checkout to_keep + dolt sql -q 'select length(val) from vals;' +} From 88a9a34819a55c3344598c9fe416e6bc9d24d8cc Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 17 Oct 2024 03:07:47 -0700 Subject: [PATCH 09/12] go/store/{nbs,types}: Small cleanups for gc --full. --- .../doltcore/sqle/dprocedures/dolt_gc.go | 4 ++-- go/store/nbs/store.go | 1 - go/store/types/value_store.go | 24 +++++++++---------- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 209457912da..4a28a3fe00c 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -73,7 +73,7 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { } if apr.NArg() != 0 { - return cmdFailure, fmt.Errorf("cannot supply both --shallow and --full to dolt_gc: %w", InvalidArgErr) + return cmdFailure, InvalidArgErr } dSess := dsess.DSessFromSess(ctx.Session) @@ -83,7 +83,7 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { } if apr.Contains(cli.ShallowFlag) && apr.Contains(cli.FullFlag) { - return cmdFailure, InvalidArgErr + return cmdFailure, fmt.Errorf("cannot supply both --shallow and --full to dolt_gc: %w", InvalidArgErr) } if apr.Contains(cli.ShallowFlag) { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 76715710bd9..8c903ad75aa 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1658,7 +1658,6 @@ type gcFinalizer struct { } func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyF, error) { - // TODO: HasManyF fileIdToNumChunks := tableSpecsToMap(gcf.specs) var addrs []hash.Hash for _, spec := range gcf.specs { diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 47dbf6d20e5..d9fd35d2078 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -584,12 +584,12 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe var oldGenHasMany chunks.HasManyF switch mode { - case GCModeDefault: - oldGenHasMany = oldGen.HasMany - case GCModeFull: - oldGenHasMany = unfilteredHashFunc - default: - return fmt.Errorf("unsupported GCMode %v", mode) + case GCModeDefault: + oldGenHasMany = oldGen.HasMany + case GCModeFull: + oldGenHasMany = unfilteredHashFunc + default: + return fmt.Errorf("unsupported GCMode %v", mode) } err := collector.BeginGC(lvs.gcAddChunk) @@ -646,6 +646,12 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe return err } + err = newGenFinalizer.SwapChunksInStore(ctx) + if err != nil { + collector.EndGC() + return err + } + if mode == GCModeFull { err = oldGenFinalizer.SwapChunksInStore(ctx) if err != nil { @@ -654,12 +660,6 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe } } - err = newGenFinalizer.SwapChunksInStore(ctx) - if err != nil { - collector.EndGC() - return err - } - collector.EndGC() } else if collectorOK { extraNewGenRefs := lvs.transitionToNewGenGC() From 077294fafad41e608d9ab571cd778e2f08b4e598 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 17 Oct 2024 03:59:05 -0700 Subject: [PATCH 10/12] go/store/types: Fix `dolt gc` for the archive use case, even if archives will still break `dolt gc --full`. --- go/store/types/value_store.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index d9fd35d2078..53ca24849b0 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -585,6 +585,12 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe var oldGenHasMany chunks.HasManyF switch mode { case GCModeDefault: + // We use the newGen directly as a the collector here, + // since all chunks we will ever source come from + // newGen. This allows dolt gc to continue working with + // old gens that contain archive files, whereas |dolt + // gc --full| does not currently support them. + collector = newGen oldGenHasMany = oldGen.HasMany case GCModeFull: oldGenHasMany = unfilteredHashFunc From 6cfac1b332c6a15723766003ca6c10731b17304c Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 17 Oct 2024 08:55:57 -0700 Subject: [PATCH 11/12] go/store/{types,nbs}: Make archive_file_reader support getManyCompressed by just fetching the chunks and re-compressing them. This makes `dolt gc` work while sourcing the chunks from the generational store and makes `dolt gc --full` work against databases which have been previously archived. `dolt gc --full` will still completely undo the archiving in the process of running the garbage collection. --- go/store/nbs/archive_chunk_source.go | 4 +++- go/store/types/value_store.go | 6 ------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/go/store/nbs/archive_chunk_source.go b/go/store/nbs/archive_chunk_source.go index 41221a1aff2..d637cba4093 100644 --- a/go/store/nbs/archive_chunk_source.go +++ b/go/store/nbs/archive_chunk_source.go @@ -151,7 +151,9 @@ func (acs archiveChunkSource) getRecordRanges(_ context.Context, _ []getRecord) } func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgroup.Group, reqs []getRecord, found func(context.Context, CompressedChunk), stats *Stats) (bool, error) { - return false, errors.New("Archive chunk source does not support getManyCompressed") + return acs.getMany(ctx, eg, reqs, func(ctx context.Context, chk *chunks.Chunk) { + found(ctx, ChunkToCompressedChunk(*chk)) + }, stats) } func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk)) error { diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 53ca24849b0..d9fd35d2078 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -585,12 +585,6 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe var oldGenHasMany chunks.HasManyF switch mode { case GCModeDefault: - // We use the newGen directly as a the collector here, - // since all chunks we will ever source come from - // newGen. This allows dolt gc to continue working with - // old gens that contain archive files, whereas |dolt - // gc --full| does not currently support them. - collector = newGen oldGenHasMany = oldGen.HasMany case GCModeFull: oldGenHasMany = unfilteredHashFunc From 0a53805d3900f6d0e26048442a3c8147ba07afd4 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 17 Oct 2024 11:39:16 -0700 Subject: [PATCH 12/12] go/store/{nbs,chunks,types}: PR feedback. --- go/store/chunks/chunk_store.go | 6 +- go/store/chunks/memory_store.go | 2 +- go/store/nbs/store.go | 6 +- go/store/nbs/table_set.go | 9 ++ go/store/types/value_store.go | 170 ++++++++++++++++---------------- 5 files changed, 103 insertions(+), 90 deletions(-) diff --git a/go/store/chunks/chunk_store.go b/go/store/chunks/chunk_store.go index 17143c100e2..a074529ff7e 100644 --- a/go/store/chunks/chunk_store.go +++ b/go/store/chunks/chunk_store.go @@ -152,7 +152,9 @@ type LoggingChunkStore interface { var ErrAddChunkMustBlock = errors.New("chunk keeper: add chunk must block") -type HasManyF func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) +// The function type for ChunkStore.HasMany. Used as a return value in the +// GCFinalizer interface. +type HasManyFunc func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSet, err error) // A GCFinalizer is returned from MarkAndSweepChunks after the keep hashes channel is closed. // @@ -174,7 +176,7 @@ type HasManyF func(ctx context.Context, hashes hash.HashSet) (absent hash.HashSe // * Swap to the new gen table file. // * Swap to the old gen table file. type GCFinalizer interface { - AddChunksToStore(ctx context.Context) (HasManyF, error) + AddChunksToStore(ctx context.Context) (HasManyFunc, error) SwapChunksInStore(ctx context.Context) error } diff --git a/go/store/chunks/memory_store.go b/go/store/chunks/memory_store.go index 38aa82ee024..2af154312f7 100644 --- a/go/store/chunks/memory_store.go +++ b/go/store/chunks/memory_store.go @@ -348,7 +348,7 @@ type msvGcFinalizer struct { keepers map[hash.Hash]Chunk } -func (mgcf msvGcFinalizer) AddChunksToStore(ctx context.Context) (HasManyF, error) { +func (mgcf msvGcFinalizer) AddChunksToStore(ctx context.Context) (HasManyFunc, error) { panic("unsupported") } diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 8c903ad75aa..054d0c57542 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1037,7 +1037,7 @@ func (nbs *NomsBlockStore) HasMany(ctx context.Context, hashes hash.HashSet) (ha return nbs.hasMany(toHasRecords(hashes)) } -func (nbs *NomsBlockStore) hasManyInSources(ctx context.Context, srcs []hash.Hash, hashes hash.HashSet) (hash.HashSet, error) { +func (nbs *NomsBlockStore) hasManyInSources(srcs []hash.Hash, hashes hash.HashSet) (hash.HashSet, error) { if hashes.Size() == 0 { return nil, nil } @@ -1657,14 +1657,14 @@ type gcFinalizer struct { specs []tableSpec } -func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyF, error) { +func (gcf gcFinalizer) AddChunksToStore(ctx context.Context) (chunks.HasManyFunc, error) { fileIdToNumChunks := tableSpecsToMap(gcf.specs) var addrs []hash.Hash for _, spec := range gcf.specs { addrs = append(addrs, spec.name) } f := func(ctx context.Context, hashes hash.HashSet) (hash.HashSet, error) { - return gcf.nbs.hasManyInSources(ctx, addrs, hashes) + return gcf.nbs.hasManyInSources(addrs, hashes) } return f, gcf.nbs.AddTableFilesToManifest(ctx, fileIdToNumChunks) } diff --git a/go/store/nbs/table_set.go b/go/store/nbs/table_set.go index e58acbe6386..185743199a0 100644 --- a/go/store/nbs/table_set.go +++ b/go/store/nbs/table_set.go @@ -115,6 +115,15 @@ func (ts tableSet) hasMany(addrs []hasRecord) (bool, error) { return f(ts.upstream) } +// Updates the records in |addrs| for whether they exist in this table set, but +// only consults tables whose names appear in |srcs|, ignoring all other tables +// in the table set. Returns |remaining| as true if all addresses were not +// found in the consulted tables, and false otherwise. +// +// Intended to be exactly like |hasMany|, except filtering for the files +// consulted. Only used for part of the GC workflow where we want to have +// access to all chunks in the store but need to check for existing chunk +// presence in only a subset of its files. func (ts tableSet) hasManyInSources(srcs []hash.Hash, addrs []hasRecord) (remaining bool, err error) { for _, rec := range addrs { if !rec.has { diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index d9fd35d2078..2fba7a75b83 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -582,7 +582,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe oldGen := gcs.OldGen() newGen := gcs.NewGen() - var oldGenHasMany chunks.HasManyF + var oldGenHasMany chunks.HasManyFunc switch mode { case GCModeDefault: oldGenHasMany = oldGen.HasMany @@ -592,113 +592,115 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe return fmt.Errorf("unsupported GCMode %v", mode) } - err := collector.BeginGC(lvs.gcAddChunk) - if err != nil { - return err - } - - root, err := lvs.Root(ctx) - if err != nil { - collector.EndGC() - return err - } + err := func() error { + err := collector.BeginGC(lvs.gcAddChunk) + if err != nil { + return err + } + defer collector.EndGC() - if root == (hash.Hash{}) { - // empty root - collector.EndGC() - return nil - } + root, err := lvs.Root(ctx) + if err != nil { + return err + } - oldGenRefs, err = oldGenHasMany(ctx, oldGenRefs) - if err != nil { - return err - } + if root == (hash.Hash{}) { + // empty root + return nil + } - newGenRefs.Insert(root) + oldGenRefs, err = oldGenHasMany(ctx, oldGenRefs) + if err != nil { + return err + } - var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer - oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, collector, oldGen, nil, func() hash.HashSet { - n := lvs.transitionToNewGenGC() - newGenRefs.InsertAll(n) - return make(hash.HashSet) - }) - if err != nil { - collector.EndGC() - return err - } + newGenRefs.Insert(root) - var newFileHasMany chunks.HasManyF - newFileHasMany, err = oldGenFinalizer.AddChunksToStore(ctx) - if err != nil { - collector.EndGC() - return err - } + var oldGenFinalizer, newGenFinalizer chunks.GCFinalizer + oldGenFinalizer, err = lvs.gc(ctx, oldGenRefs, oldGenHasMany, collector, oldGen, nil, func() hash.HashSet { + n := lvs.transitionToNewGenGC() + newGenRefs.InsertAll(n) + return make(hash.HashSet) + }) + if err != nil { + return err + } - if mode == GCModeDefault { - oldGenHasMany = oldGen.HasMany - } else { - oldGenHasMany = newFileHasMany - } + var newFileHasMany chunks.HasManyFunc + newFileHasMany, err = oldGenFinalizer.AddChunksToStore(ctx) + if err != nil { + return err + } - newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, collector, newGen, safepointF, lvs.transitionToFinalizingGC) - if err != nil { - collector.EndGC() - return err - } + if mode == GCModeDefault { + oldGenHasMany = oldGen.HasMany + } else { + oldGenHasMany = newFileHasMany + } - err = newGenFinalizer.SwapChunksInStore(ctx) - if err != nil { - collector.EndGC() - return err - } + newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, collector, newGen, safepointF, lvs.transitionToFinalizingGC) + if err != nil { + return err + } - if mode == GCModeFull { - err = oldGenFinalizer.SwapChunksInStore(ctx) + err = newGenFinalizer.SwapChunksInStore(ctx) if err != nil { - collector.EndGC() return err } - } - collector.EndGC() + if mode == GCModeFull { + err = oldGenFinalizer.SwapChunksInStore(ctx) + if err != nil { + return err + } + } + + return nil + }() + + if err != nil { + return err + } } else if collectorOK { extraNewGenRefs := lvs.transitionToNewGenGC() newGenRefs.InsertAll(extraNewGenRefs) newGenRefs.InsertAll(oldGenRefs) - err := collector.BeginGC(lvs.gcAddChunk) - if err != nil { - return err - } + err := func() error { + err := collector.BeginGC(lvs.gcAddChunk) + if err != nil { + return err + } - root, err := lvs.Root(ctx) - if err != nil { - collector.EndGC() - return err - } + root, err := lvs.Root(ctx) + if err != nil { + return err + } - if root == (hash.Hash{}) { - // empty root - collector.EndGC() - return nil - } + if root == (hash.Hash{}) { + // empty root + return nil + } - newGenRefs.Insert(root) + newGenRefs.Insert(root) - var finalizer chunks.GCFinalizer - finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC) - if err != nil { - collector.EndGC() - return err - } + var finalizer chunks.GCFinalizer + finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, collector, collector, safepointF, lvs.transitionToFinalizingGC) + if err != nil { + return err + } + + err = finalizer.SwapChunksInStore(ctx) + if err != nil { + return err + } + + return nil + }() - err = finalizer.SwapChunksInStore(ctx) if err != nil { - collector.EndGC() return err } - - collector.EndGC() } else { return chunks.ErrUnsupportedOperation } @@ -716,7 +718,7 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe func (lvs *ValueStore) gc(ctx context.Context, toVisit hash.HashSet, - hashFilter chunks.HasManyF, + hashFilter chunks.HasManyFunc, src, dest chunks.ChunkStoreGarbageCollector, safepointF func() error, finalize func() hash.HashSet) (chunks.GCFinalizer, error) { @@ -772,7 +774,7 @@ func (lvs *ValueStore) gc(ctx context.Context, func (lvs *ValueStore) gcProcessRefs(ctx context.Context, initialToVisit hash.HashSet, keepHashes func(hs []hash.Hash) error, - walker *parallelRefWalker, hashFilter chunks.HasManyF, + walker *parallelRefWalker, hashFilter chunks.HasManyFunc, safepointF func() error, finalize func() hash.HashSet) error { visited := make(hash.HashSet)