Skip to content

Commit

Permalink
lightning, ddl: set TS to engineMeta after ResetEngineSkipAllocTS (pi…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and lance6716 committed Dec 6, 2024
1 parent a7df4f9 commit f54c476
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 15 deletions.
3 changes: 1 addition & 2 deletions pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error {
if err != nil {
return errors.Trace(err)
}
ei.openedEngine.SetTS(newTS)
return nil
return bc.backend.SetTSAfterResetEngine(ei.uuid, newTS)
}

// ForceSyncFlagForTest is a flag to force sync only for test.
Expand Down
12 changes: 4 additions & 8 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ type EngineConfig struct {
// when opening the engine, instead of removing it.
KeepSortDir bool
// TS is the preset timestamp of data in the engine. When it's 0, the used TS
// will be set lazily.
// will be set lazily. This is used by local backend. This field will be written
// to engineMeta.TS and take effect in below cases:
// - engineManager.openEngine
// - engineManager.closeEngine only for an external engine
TS uint64
}

Expand Down Expand Up @@ -307,13 +310,6 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
return engine.backend.LocalWriter(ctx, cfg, engine.uuid)
}

// SetTS sets the TS of the engine. In most cases if the caller wants to specify
// TS it should use the TS field in EngineConfig. This method is only used after
// a ResetEngine.
func (engine *OpenedEngine) SetTS(ts uint64) {
engine.config.TS = ts
}

// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
Expand Down
1 change: 1 addition & 0 deletions pkg/lightning/backend/local/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ go_library(
"//pkg/util/compress",
"//pkg/util/engine",
"//pkg/util/hack",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/mathutil",
"//pkg/util/ranger",
Expand Down
6 changes: 6 additions & 0 deletions pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
Expand Down Expand Up @@ -68,6 +70,7 @@ func TestGetEngineSizeWhenImport(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
// simulate import
f.lock(importMutexStateImport)
Expand Down Expand Up @@ -106,6 +109,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
f.sstIngester = dbSSTIngester{e: f}
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
Expand Down Expand Up @@ -142,6 +146,7 @@ func TestGetFirstAndLastKey(t *testing.T) {
f := &Engine{
sstDir: tmpPath,
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -184,6 +189,7 @@ func TestIterOutputHasUniqueMemorySpace(t *testing.T) {
f := &Engine{
sstDir: tmpPath,
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down
15 changes: 14 additions & 1 deletion pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1518,12 +1518,25 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err
}

// ResetEngineSkipAllocTS is like ResetEngine but the inner TS of the engine is
// invalid. Caller must use OpenedEngine.SetTS to set a valid TS before import
// invalid. Caller must use SetTSAfterResetEngine to set a valid TS before import
// the engine.
func (local *Backend) ResetEngineSkipAllocTS(ctx context.Context, engineUUID uuid.UUID) error {
return local.engineMgr.resetEngine(ctx, engineUUID, true)
}

// SetTSAfterResetEngine allocates a new TS for the engine after it's reset.
// This is typically called after persisting the chosen TS of the engine to make
// sure TS is not changed after task failover.
func (local *Backend) SetTSAfterResetEngine(engineUUID uuid.UUID, ts uint64) error {
e := local.engineMgr.lockEngine(engineUUID, importMutexStateClose)
if e == nil {
return errors.Errorf("engine %s not found in SetTSAfterResetEngine", engineUUID.String())
}
defer e.unlock()
e.engineMeta.TS = ts
return e.saveEngineMeta()
}

// CleanupEngine cleanup the engine and reclaim the space.
func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error {
return local.engineMgr.cleanupEngine(ctx, engineUUID)
Expand Down
7 changes: 6 additions & 1 deletion pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/http"
"google.golang.org/grpc"
Expand Down Expand Up @@ -343,6 +344,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
f.sstIngester = dbSSTIngester{e: f}
f.wg.Add(1)
Expand Down Expand Up @@ -589,6 +591,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)

createSSTWriter := func() (*sstWriter, error) {
Expand Down Expand Up @@ -1211,7 +1214,7 @@ func (m mockIngestData) NewIter(_ context.Context, lowerBound, upperBound []byte
return &mockIngestIter{data: m, startIdx: i, endIdx: j, curIdx: i}
}

func (m mockIngestData) GetTS() uint64 { return 0 }
func (m mockIngestData) GetTS() uint64 { return oracle.GoTimeToTS(time.Now()) }

func (m mockIngestData) IncRef() {}

Expand Down Expand Up @@ -1599,6 +1602,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err = db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1740,6 +1744,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
// keys starts with 0 is meta keys, so we start with 1.
for i := byte(1); i <= 10; i++ {
Expand Down
16 changes: 16 additions & 0 deletions pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/lightning/metric"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -320,6 +322,20 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
allPeers = append(allPeers, peer)
}
dataCommitTS := j.ingestData.GetTS()
intest.AssertFunc(func() bool {
timeOfTS := oracle.GetTimeFromTS(dataCommitTS)
now := time.Now()
if timeOfTS.After(now) {
return false
}
if now.Sub(timeOfTS) > 24*time.Hour {
return false
}
return true
}, "TS used in import should in [now-1d, now], but got %d", dataCommitTS)
if dataCommitTS == 0 {
return errors.New("data commitTS is 0")
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
CommitTs: dataCommitTS,
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/addindextest2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ go_test(
"@com_github_phayes_freeport//:freeport",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
],
)
12 changes: 9 additions & 3 deletions tests/realtikvtest/addindextest2/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/phayes/freeport"
Expand All @@ -38,6 +39,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func init() {
Expand Down Expand Up @@ -313,7 +315,11 @@ func TestIngestUseGivenTS(t *testing.T) {
t.Cleanup(func() {
tk.MustExec("set @@global.tidb_cloud_storage_uri = '';")
})
err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", `return(123456789)`)

presetTS := oracle.GoTimeToTS(time.Now())
failpointTerm := fmt.Sprintf(`return(%d)`, presetTS)

err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", failpointTerm)
require.NoError(t, err)

tk.MustExec("create table t (a int);")
Expand All @@ -330,10 +336,10 @@ func TestIngestUseGivenTS(t *testing.T) {
require.NoError(t, err)
tikvStore := dom.Store().(helper.Storage)
newHelper := helper.NewHelper(tikvStore)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, 123456789)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, presetTS)
require.NoError(t, err)
require.NotNil(t, mvccResp)
require.NotNil(t, mvccResp.Info)
require.Greater(t, len(mvccResp.Info.Writes), 0)
require.Equal(t, uint64(123456789), mvccResp.Info.Writes[0].CommitTs)
require.Equal(t, presetTS, mvccResp.Info.Writes[0].CommitTs)
}
30 changes: 30 additions & 0 deletions tests/realtikvtest/addindextest4/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,36 @@ func TestAddIndexFinishImportError(t *testing.T) {
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestAddIndexDiskQuotaTS(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
testAddIndexDiskQuotaTS(t, tk)
tk.MustExec("set @@global.tidb_enable_dist_task = 1;")
testAddIndexDiskQuotaTS(t, tk)
// reset changed global variable
tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
}

func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) {
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set global tidb_ddl_reorg_worker_cnt=1;")

tk.MustExec("create table t(id int primary key, b int, k int);")
tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))
tk.MustExec("insert into t values(1, 1, 1);")
tk.MustExec("insert into t values(100000, 1, 1);")

ingest.ForceSyncFlagForTest = true
tk.MustExec("alter table t add index idx_test(b);")
ingest.ForceSyncFlagForTest = false
tk.MustExec("update t set b = b + 1;")
}

func TestAddIndexRemoteDuplicateCheck(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit f54c476

Please sign in to comment.