diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index a15a98390ee52..03d3d3b3f38a1 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -160,6 +160,11 @@ func (c *CheckpointAdvancer) Config() config.Config { return c.cfg } +// GetInResolvingLock only used for test. +func (c *CheckpointAdvancer) GetInResolvingLock() bool { + return c.inResolvingLock.Load() +} + // GetCheckpointInRange scans the regions in the range, // collect them to the collector. func (c *CheckpointAdvancer) GetCheckpointInRange(ctx context.Context, start, end []byte, collector *clusterCollector) error { @@ -499,8 +504,10 @@ func (c *CheckpointAdvancer) optionalTick(cx context.Context) error { // use new context here to avoid timeout ctx := context.Background() c.asyncResolveLocksForRanges(ctx, targets) + } else { + // don't forget set state back + c.inResolvingLock.Store(false) } - c.inResolvingLock.Store(false) } threshold := c.Config().GetDefaultStartPollThreshold() if err := c.subscribeTick(cx); err != nil { @@ -544,6 +551,7 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar // run in another goroutine // do not block main tick here go func() { + failpoint.Inject("AsyncResolveLocks", func() {}) handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { // we will scan all locks and try to resolve them by check txn status. return tikv.ResolveLocksForRange( @@ -580,5 +588,6 @@ func (c *CheckpointAdvancer) asyncResolveLocksForRanges(ctx context.Context, tar c.lastCheckpointMu.Lock() c.lastCheckpoint.resolveLockTime = time.Now() c.lastCheckpointMu.Unlock() + c.inResolvingLock.Store(false) }() } diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 6783584bda282..d284c4bb03666 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -269,6 +269,8 @@ func TestResolveLock(t *testing.T) { } }() require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/NeedResolveLocks", `return(true)`)) + // make sure asyncResolveLocks stuck in optionalTick later. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/br/pkg/streamhelper/AsyncResolveLocks", `pause`)) defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/NeedResolveLocks")) }() @@ -322,10 +324,15 @@ func TestResolveLock(t *testing.T) { time.Second, 50*time.Millisecond) coll := streamhelper.NewClusterCollector(ctx, env) err := adv.GetCheckpointInRange(ctx, []byte{}, []byte{}, coll) - + require.NoError(t, err) + // now the lock state must be ture. because tick finished and asyncResolveLocks got stuck. + require.True(t, adv.GetInResolvingLock()) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/br/pkg/streamhelper/AsyncResolveLocks")) require.Eventually(t, func() bool { return resolveLockRef.Load() }, 8*time.Second, 50*time.Microsecond) - require.NoError(t, err) + // state must set to false after tick + require.Eventually(t, func() bool { return !adv.GetInResolvingLock() }, + 8*time.Second, 50*time.Microsecond) r, err := coll.Finish(ctx) require.NoError(t, err) require.Len(t, r.FailureSubRanges, 0)