Skip to content

Commit

Permalink
lightning, ddl: set TS to engineMeta after ResetEngineSkipAllocTS (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Dec 5, 2024
1 parent 821563f commit 098213a
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 14 deletions.
5 changes: 4 additions & 1 deletion pkg/ddl/ingest/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ func (bc *litBackendCtx) Flush(ctx context.Context, mode FlushMode) (flushed, im
newTS, err = mgr.refreshTSAndUpdateCP()
if err == nil {
for _, ei := range bc.engines {
ei.openedEngine.SetTS(newTS)
err = bc.backend.SetTSAfterResetEngine(ei.uuid, newTS)
if err != nil {
return false, false, err
}
}
}
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ type EngineConfig struct {
// when opening the engine, instead of removing it.
KeepSortDir bool
// TS is the preset timestamp of data in the engine. When it's 0, the used TS
// will be set lazily.
// will be set lazily. This is used by local backend. This field will be written
// to engineMeta.TS and take effect in below cases:
// - engineManager.openEngine
// - engineManager.closeEngine only for an external engine
TS uint64
}

Expand Down Expand Up @@ -319,13 +322,6 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
return engine.backend.LocalWriter(ctx, cfg, engine.uuid)
}

// SetTS sets the TS of the engine. In most cases if the caller wants to specify
// TS it should use the TS field in EngineConfig. This method is only used after
// a ResetEngine.
func (engine *OpenedEngine) SetTS(ts uint64) {
engine.config.TS = ts
}

// UnsafeCloseEngine closes the engine without first opening it.
// This method is "unsafe" as it does not follow the normal operation sequence
// (Open -> Write -> Close -> Import). This method should only be used when one
Expand Down
6 changes: 6 additions & 0 deletions pkg/lightning/backend/local/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/pingcap/tidb/pkg/lightning/common"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func makePebbleDB(t *testing.T, opt *pebble.Options) (*pebble.DB, string) {
Expand Down Expand Up @@ -68,6 +70,7 @@ func TestGetEngineSizeWhenImport(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
// simulate import
f.lock(importMutexStateImport)
Expand Down Expand Up @@ -106,6 +109,7 @@ func TestIngestSSTWithClosedEngine(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
f.sstIngester = dbSSTIngester{e: f}
sstPath := path.Join(tmpPath, uuid.New().String()+".sst")
Expand Down Expand Up @@ -142,6 +146,7 @@ func TestGetFirstAndLastKey(t *testing.T) {
f := &Engine{
sstDir: tmpPath,
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -184,6 +189,7 @@ func TestIterOutputHasUniqueMemorySpace(t *testing.T) {
f := &Engine{
sstDir: tmpPath,
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err := db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down
15 changes: 14 additions & 1 deletion pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1602,12 +1602,25 @@ func (local *Backend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) err
}

// ResetEngineSkipAllocTS is like ResetEngine but the inner TS of the engine is
// invalid. Caller must use OpenedEngine.SetTS to set a valid TS before import
// invalid. Caller must use SetTSAfterResetEngine to set a valid TS before import
// the engine.
func (local *Backend) ResetEngineSkipAllocTS(ctx context.Context, engineUUID uuid.UUID) error {
return local.engineMgr.resetEngine(ctx, engineUUID, true)
}

// SetTSAfterResetEngine allocates a new TS for the engine after it's reset.
// This is typically called after persisting the chosen TS of the engine to make
// sure TS is not changed after task failover.
func (local *Backend) SetTSAfterResetEngine(engineUUID uuid.UUID, ts uint64) error {
e := local.engineMgr.lockEngine(engineUUID, importMutexStateClose)
if e == nil {
return errors.Errorf("engine %s not found in SetTSAfterResetEngine", engineUUID.String())
}
defer e.unlock()
e.engineMeta.TS = ts
return e.saveEngineMeta()
}

// CleanupEngine cleanup the engine and reclaim the space.
func (local *Backend) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error {
return local.engineMgr.cleanupEngine(ctx, engineUUID)
Expand Down
7 changes: 6 additions & 1 deletion pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"github.com/pingcap/tidb/pkg/util/hack"
"github.com/pingcap/tidb/pkg/util/mathutil"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/http"
Expand Down Expand Up @@ -353,6 +354,7 @@ func testLocalWriter(t *testing.T, needSort bool, partitialSort bool) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
f.sstIngester = dbSSTIngester{e: f}
f.wg.Add(1)
Expand Down Expand Up @@ -587,6 +589,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)

createSSTWriter := func() (*sstWriter, error) {
Expand Down Expand Up @@ -1176,7 +1179,7 @@ func (m mockIngestData) NewIter(_ context.Context, lowerBound, upperBound []byte
return &mockIngestIter{data: m, startIdx: i, endIdx: j, curIdx: i}
}

func (m mockIngestData) GetTS() uint64 { return 0 }
func (m mockIngestData) GetTS() uint64 { return oracle.GoTimeToTS(time.Now()) }

func (m mockIngestData) IncRef() {}

Expand Down Expand Up @@ -1565,6 +1568,7 @@ func TestPartialWriteIngestBusy(t *testing.T) {
keyAdapter: common.NoopKeyAdapter{},
logger: log.L(),
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
err = db.Set([]byte("a"), []byte("a"), nil)
require.NoError(t, err)
Expand Down Expand Up @@ -1708,6 +1712,7 @@ func TestSplitRangeAgain4BigRegion(t *testing.T) {
regionSplitKeysCache: [][]byte{{1}, {11}},
regionSplitSize: 1 << 30,
}
f.TS = oracle.GoTimeToTS(time.Now())
f.db.Store(db)
// keys starts with 0 is meta keys, so we start with 1.
for i := byte(1); i <= 10; i++ {
Expand Down
15 changes: 15 additions & 0 deletions pkg/lightning/backend/local/region_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
util2 "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/codec"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -436,6 +437,20 @@ func (local *Backend) doWrite(ctx context.Context, j *regionJob) error {
allPeers = append(allPeers, peer)
}
dataCommitTS := j.ingestData.GetTS()
intest.AssertFunc(func() bool {
timeOfTS := oracle.GetTimeFromTS(dataCommitTS)
now := time.Now()
if timeOfTS.After(now) {
return false
}
if now.Sub(timeOfTS) > 24*time.Hour {
return false
}
return true
}, "TS used in import should in [now-1d, now], but got %d", dataCommitTS)
if dataCommitTS == 0 {
return errors.New("data commitTS is 0")
}
req.Chunk = &sst.WriteRequest_Batch{
Batch: &sst.WriteBatch{
CommitTs: dataCommitTS,
Expand Down
1 change: 1 addition & 0 deletions tests/realtikvtest/addindextest2/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ go_test(
"@com_github_phayes_freeport//:freeport",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//oracle",
],
)
12 changes: 9 additions & 3 deletions tests/realtikvtest/addindextest2/global_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/phayes/freeport"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/tests/realtikvtest"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
)

func init() {
Expand Down Expand Up @@ -320,7 +322,11 @@ func TestIngestUseGivenTS(t *testing.T) {
t.Cleanup(func() {
tk.MustExec("set @@global.tidb_cloud_storage_uri = '';")
})
err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", `return(123456789)`)

presetTS := oracle.GoTimeToTS(time.Now())
failpointTerm := fmt.Sprintf(`return(%d)`, presetTS)

err = failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockTSForGlobalSort", failpointTerm)
require.NoError(t, err)

tk.MustExec("create table t (a int);")
Expand All @@ -336,10 +342,10 @@ func TestIngestUseGivenTS(t *testing.T) {
require.NoError(t, err)
tikvStore := dom.Store().(helper.Storage)
newHelper := helper.NewHelper(tikvStore)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, 123456789)
mvccResp, err := newHelper.GetMvccByEncodedKeyWithTS(idxKey, presetTS)
require.NoError(t, err)
require.NotNil(t, mvccResp)
require.NotNil(t, mvccResp.Info)
require.Greater(t, len(mvccResp.Info.Writes), 0)
require.Equal(t, uint64(123456789), mvccResp.Info.Writes[0].CommitTs)
require.Equal(t, presetTS, mvccResp.Info.Writes[0].CommitTs)
}
28 changes: 28 additions & 0 deletions tests/realtikvtest/addindextest3/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,34 @@ func TestAddIndexMockFlushError(t *testing.T) {
require.True(t, strings.Contains(jobTp, "ingest"), jobTp)
}

func TestAddIndexDiskQuotaTS(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)

tk.MustExec("set @@global.tidb_enable_dist_task = 0;")
testAddIndexDiskQuotaTS(t, tk)
tk.MustExec("set @@global.tidb_enable_dist_task = 1;")
testAddIndexDiskQuotaTS(t, tk)
}

func testAddIndexDiskQuotaTS(t *testing.T, tk *testkit.TestKit) {
tk.MustExec("drop database if exists addindexlit;")
tk.MustExec("create database addindexlit;")
tk.MustExec("use addindexlit;")
tk.MustExec(`set global tidb_ddl_enable_fast_reorg=on;`)
tk.MustExec("set @@tidb_ddl_reorg_worker_cnt=1;")

tk.MustExec("create table t(id int primary key, b int, k int);")
tk.MustQuery("split table t by (30000);").Check(testkit.Rows("1 1"))
tk.MustExec("insert into t values(1, 1, 1);")
tk.MustExec("insert into t values(100000, 1, 1);")

ingest.ForceSyncFlagForTest = true
tk.MustExec("alter table t add index idx_test(b);")
ingest.ForceSyncFlagForTest = false
tk.MustExec("update t set b = b + 1;")
}

func TestAddIndexRemoteDuplicateCheck(t *testing.T) {
store := realtikvtest.CreateMockStoreAndSetup(t)
tk := testkit.NewTestKit(t, store)
Expand Down

0 comments on commit 098213a

Please sign in to comment.