Skip to content

Commit

Permalink
br: fix the issue that state not set correctly after resolve locks (p…
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Sep 12, 2023
1 parent a01042c commit 8fd8cce
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 3 deletions.
11 changes: 10 additions & 1 deletion br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func (c *CheckpointAdvancer) Config() config.Config {
return c.cfg
}

// GetInResolvingLock only used for test.
func (c *CheckpointAdvancer) GetInResolvingLock() bool {
return c.inResolvingLock.Load()
}

// GetCheckpointInRange scans the regions in the range,
// collect them to the collector.
func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, end []byte, collector *clusterCollector) error {
Expand Down Expand Up @@ -499,8 +504,10 @@ func (c *CheckpointAdvancer) optionalTick(cx context.Context) error {
// use new context here to avoid timeout
ctx := context.Background()
c.asyncResolveLocksForRanges(ctx, targets)
} else {
// don't forget set state back
c.inResolvingLock.Store(false)
}
c.inResolvingLock.Store(false)
}
threshold := c.Config().GetDefaultStartPollThreshold()
if err := c.subscribeTick(cx); err != nil {
Expand Down Expand Up @@ -544,6 +551,7 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar
// run in another goroutine
// do not block main tick here
go func() {
failpoint.Inject("AsyncResolveLocks", func() {})
handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) {
// we will scan all locks and try to resolve them by check txn status.
return tikv.ResolveLocksForRange(
Expand Down Expand Up @@ -580,5 +588,6 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar
c.lastCheckpointMu.Lock()
c.lastCheckpoint.resolveLockTime = time.Now()
c.lastCheckpointMu.Unlock()
c.inResolvingLock.Store(false)
}()
}
11 changes: 9 additions & 2 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ func TestResolveLock(t *testing.T) {
}
}()
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/NeedResolveLocks", `return(true)`))
// make sure asyncResolveLocks stuck in optionalTick later.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/AsyncResolveLocks", `pause`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/NeedResolveLocks"))
}()
Expand Down Expand Up @@ -322,10 +324,15 @@ func TestResolveLock(t *testing.T) {
time.Second, 50*time.Millisecond)
coll := streamhelper.NewClusterCollector(ctx, env)
err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll)

require.NoError(t, err)
// now the lock state must be ture. because tick finished and asyncResolveLocks got stuck.
require.True(t, adv.GetInResolvingLock())
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/AsyncResolveLocks"))
require.Eventually(t, func() bool { return resolveLockRef.Load() },
8*time.Second, 50*time.Microsecond)
require.NoError(t, err)
// state must set to false after tick
require.Eventually(t, func() bool { return !adv.GetInResolvingLock() },
8*time.Second, 50*time.Microsecond)
r, err := coll.Finish(ctx)
require.NoError(t, err)
require.Len(t, r.FailureSubRanges, 0)
Expand Down

0 comments on commit 8fd8cce

Please sign in to comment.