Skip to content

Commit

Permalink
Merge #55756
Browse files Browse the repository at this point in the history
55756: schemachange/tenant: destroy tenant data with GC job r=spaskob a=spaskob

Fixes #48775.

This PR adds tenant GC fields to schema change Details and
Progress protos and adds support for deleting tenant's data
via the schema change GC job.

Release note: none.

Co-authored-by: Spas Bojanov <spas@cockroachlabs.com>
  • Loading branch information
craig[bot] and Spas Bojanov committed Jan 16, 2021
2 parents ff1442b + 161b3fb commit fb03099
Show file tree
Hide file tree
Showing 17 changed files with 1,444 additions and 461 deletions.
28 changes: 28 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6294,13 +6294,25 @@ func TestBackupRestoreTenant(t *testing.T) {
ten10Stopper.Stop(ctx)
restoreConn10 = nil

// Mark tenant as DROP.
restoreDB.Exec(t, `SELECT crdb_internal.destroy_tenant(10)`)
restoreDB.CheckQueryResults(t,
`select id, active, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info) from system.tenants`,
[][]string{{`10`, `false`, `{"id": "10", "state": "DROP"}`}},
)

// Make GC jobs run in 1 second.
restoreDB.Exec(t, "SET CLUSTER SETTING kv.range_merge.queue_enabled = false")
restoreDB.Exec(t, "ALTER RANGE tenants CONFIGURE ZONE USING gc.ttlseconds = 1;")
// Now run the GC job to delete the tenant and its data.
restoreDB.Exec(t, `SELECT crdb_internal.gc_tenant(10)`)
// Wait for tenant GC job to complete.
restoreDB.CheckQueryResultsRetry(
t,
"SELECT status FROM [SHOW JOBS] WHERE description = 'GC for tenant 10'",
[][]string{{"succeeded"}},
)

ten10Prefix := keys.MakeTenantPrefix(roachpb.MakeTenantID(10))
ten10PrefixEnd := ten10Prefix.PrefixEnd()
rows, err := restoreTC.Server(0).DB().Scan(ctx, ten10Prefix, ten10PrefixEnd, 0 /* maxRows */)
Expand All @@ -6323,6 +6335,22 @@ func TestBackupRestoreTenant(t *testing.T) {

restoreTenant10.CheckQueryResults(t, `select * from foo.bar`, tenant10.QueryStr(t, `select * from foo.bar`))
restoreTenant10.CheckQueryResults(t, `select * from foo.bar2`, tenant10.QueryStr(t, `select * from foo.bar2`))

restoreDB.Exec(t, `SELECT crdb_internal.destroy_tenant(10)`)
restoreDB.Exec(t, `SELECT crdb_internal.gc_tenant(10)`)
// Wait for tenant GC job to complete.
restoreDB.CheckQueryResultsRetry(
t,
"SELECT status FROM [SHOW JOBS] WHERE description = 'GC for tenant 10'",
[][]string{{"succeeded"}, {"succeeded"}},
)

restoreDB.CheckQueryResults(t, `select * from system.tenants`, [][]string{})
restoreDB.Exec(t, `RESTORE TENANT 10 FROM 'nodelocal://1/t10'`)
restoreDB.CheckQueryResults(t,
`select id, active, crdb_internal.pb_to_json('cockroach.sql.sqlbase.TenantInfo', info) from system.tenants`,
[][]string{{`10`, `true`, `{"id": "10", "state": "ACTIVE"}`}},
)
})

t.Run("restore-t10-from-cluster-backup", func(t *testing.T) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,9 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, execCtx interface{}
execCfg.DB, func(ctx context.Context, txn *kv.Txn, descsCol *descs.Collection) error {
for _, tenant := range details.Tenants {
tenant.State = descpb.TenantInfo_DROP
if err := sql.GCTenant(ctx, execCfg, &tenant); err != nil {
// This is already a job so no need to spin up a gc job for the tenant;
// instead just GC the data eagerly.
if err := sql.GCTenantSync(ctx, execCfg, &tenant); err != nil {
return err
}
}
Expand Down
1,269 changes: 839 additions & 430 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ message DroppedTableDetails {
// 3. Database deletions: The deletion of a database and therefore all its tables.
// details.Tables -> the IDs of the tables to GC.
// details.ParentID -> the ID of the database to drop.
//
// 4. Tenant deletion: The deletion of a tenant key range.
// details.TenantID -> the ID of the tenant to delete.
message SchemaChangeGCDetails {
message DroppedIndex {
int64 index_id = 1 [(gogoproto.customname) = "IndexID",
Expand Down Expand Up @@ -359,6 +362,14 @@ message SchemaChangeGCDetails {
// database, the database ID.
int64 parent_id = 3 [(gogoproto.customname) = "ParentID",
(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.ID"];

message DroppedTenant {
uint64 id = 1 [(gogoproto.customname) = "ID"];
int64 drop_time = 2;
}

// Tenant to GC.
DroppedTenant tenant = 6;
}

message SchemaChangeDetails {
Expand Down Expand Up @@ -427,11 +438,18 @@ message SchemaChangeGCProgress {
Status status = 2;
}

message TenantProgress {
Status status = 1;
}

// Indexes to GC.
repeated IndexProgress indexes = 1 [(gogoproto.nullable) = false];

// Entire tables to GC.
repeated TableProgress tables = 2 [(gogoproto.nullable) = false];

// The status of the tenant to be deleted.
TenantProgress tenant = 3;
}

message ChangefeedTarget {
Expand Down
33 changes: 32 additions & 1 deletion pkg/sql/gcjob/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "gcjob",
Expand All @@ -9,6 +9,7 @@ go_library(
"index_garbage_collection.go",
"refresh_statuses.go",
"table_garbage_collection.go",
"tenant_garbage_collection.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/gcjob",
visibility = ["//visibility:public"],
Expand All @@ -30,9 +31,39 @@ go_library(
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/util/log",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "gcjob_test",
srcs = [
"helpers_test.go",
"main_test.go",
"tenant_garbage_collection_test.go",
],
embed = [":gcjob"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/descpb",
"//pkg/testutils/serverutils",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_stretchr_testify//require",
],
)
21 changes: 18 additions & 3 deletions pkg/sql/gcjob/gc_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package gcjob

import (
"context"
"math"
"time"

"github.com/cockroachdb/cockroach/pkg/jobs"
Expand Down Expand Up @@ -56,6 +57,12 @@ func performGC(
details *jobspb.SchemaChangeGCDetails,
progress *jobspb.SchemaChangeGCProgress,
) error {
if details.Tenant != nil {
return errors.Wrapf(
gcTenant(ctx, execCfg, details.Tenant.ID, progress),
"attempting to GC tenant %+v", details.Tenant,
)
}
if details.Indexes != nil {
return errors.Wrap(gcIndexes(ctx, execCfg, details.ParentID, progress), "attempting to GC indexes")
} else if details.Tables != nil {
Expand Down Expand Up @@ -130,9 +137,17 @@ func (r schemaChangeGCResumer) Resume(
return ctx.Err()
}

// Refresh the status of all tables in case any GC TTLs have changed.
remainingTables := getAllTablesWaitingForGC(details, progress)
expired, earliestDeadline := refreshTables(ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress)
// Refresh the status of all elements in case any GC TTLs have changed.
var expired bool
earliestDeadline := timeutil.Unix(0, math.MaxInt64)
if details.Tenant == nil {
remainingTables := getAllTablesWaitingForGC(details, progress)
expired, earliestDeadline = refreshTables(
ctx, execCfg, remainingTables, tableDropTimes, indexDropTimes, r.jobID, progress,
)
} else {
expired, earliestDeadline = refreshTenant(ctx, execCfg, details.Tenant.DropTime, details, progress)
}
timerDuration := time.Until(earliestDeadline)

if expired {
Expand Down
21 changes: 19 additions & 2 deletions pkg/sql/gcjob/gc_job_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,23 @@ func initializeProgress(
details *jobspb.SchemaChangeGCDetails,
progress *jobspb.SchemaChangeGCProgress,
) error {
if len(progress.Tables) != len(details.Tables) || len(progress.Indexes) != len(details.Indexes) {
var update bool
if details.Tenant != nil {
progress.Tenant = &jobspb.SchemaChangeGCProgress_TenantProgress{
Status: jobspb.SchemaChangeGCProgress_WAITING_FOR_GC,
}
update = true
} else if len(progress.Tables) != len(details.Tables) || len(progress.Indexes) != len(details.Indexes) {
update = true
for _, table := range details.Tables {
progress.Tables = append(progress.Tables, jobspb.SchemaChangeGCProgress_TableProgress{ID: table.ID})
}
for _, index := range details.Indexes {
progress.Indexes = append(progress.Indexes, jobspb.SchemaChangeGCProgress_IndexProgress{IndexID: index.IndexID})
}
}

// Write out new progress.
if update {
if err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
job, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobID, txn)
if err != nil {
Expand All @@ -127,6 +135,9 @@ func isDoneGC(progress *jobspb.SchemaChangeGCProgress) bool {
return false
}
}
if progress.Tenant != nil && progress.Tenant.Status != jobspb.SchemaChangeGCProgress_DELETED {
return false
}

return true
}
Expand All @@ -153,6 +164,12 @@ func getAllTablesWaitingForGC(
// validateDetails ensures that the job details payload follows the structure
// described in the comment for SchemaChangeGCDetails.
func validateDetails(details *jobspb.SchemaChangeGCDetails) error {
if details.Tenant != nil &&
(len(details.Tables) > 0 || len(details.Indexes) > 0) {
return errors.AssertionFailedf(
"Either field Tenant is set or any of Tables or Indexes: %+v", *details,
)
}
if len(details.Indexes) > 0 {
if details.ParentID == descpb.InvalidID {
return errors.Errorf("must provide a parentID when dropping an index")
Expand Down
28 changes: 28 additions & 0 deletions pkg/sql/gcjob/helpers_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package gcjob

import (
"context"

"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/sql"
)

// GcTenant is a wrapper around the internal function that gc-s a tenant.
func GcTenant(
ctx context.Context,
execCfg *sql.ExecutorConfig,
tenID uint64,
progress *jobspb.SchemaChangeGCProgress,
) error {
return gcTenant(ctx, execCfg, tenID, progress)
}
29 changes: 29 additions & 0 deletions pkg/sql/gcjob/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package gcjob_test

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
randutil.SeedForTests()
serverutils.InitTestServerFactory(server.TestServerFactory)
os.Exit(m.Run())
}
28 changes: 28 additions & 0 deletions pkg/sql/gcjob/refresh_statuses.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
Expand Down Expand Up @@ -274,3 +275,30 @@ func isProtected(
})
return protected
}

// refreshTenant updates the status of tenant that is waiting to be GC'd. It
// returns whether or the tenant has expired or the duration until it expires.
func refreshTenant(
ctx context.Context,
execCfg *sql.ExecutorConfig,
dropTime int64,
details *jobspb.SchemaChangeGCDetails,
progress *jobspb.SchemaChangeGCProgress,
) (expired bool, deadline time.Time) {
tenantTTLSeconds := execCfg.DefaultZoneConfig.GC.TTLSeconds
tenID := details.Tenant.ID
cfg := execCfg.SystemConfig.GetSystemConfig()
zoneCfg, err := cfg.GetZoneConfigForObject(keys.MakeSQLCodec(roachpb.MakeTenantID(tenID)), 0)
if err == nil {
tenantTTLSeconds = zoneCfg.GC.TTLSeconds
} else {
log.Errorf(ctx, "zone config for tenants range: err = %+v", err)
}

deadlineNanos := dropTime + int64(tenantTTLSeconds)*time.Second.Nanoseconds()
if timeutil.Now().UnixNano() >= deadlineNanos {
progress.Tenant.Status = jobspb.SchemaChangeGCProgress_DELETING
return true, time.Time{}
}
return false, timeutil.Unix(0, deadlineNanos)
}
Loading

0 comments on commit fb03099

Please sign in to comment.