Skip to content

Commit

Permalink
ttl: make some integration test faster (#56719)
Browse files Browse the repository at this point in the history
close #56718
  • Loading branch information
lcwangchao authored Oct 21, 2024
1 parent 43b4336 commit 73584bb
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 150 deletions.
1 change: 1 addition & 0 deletions pkg/timer/runtime/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/timer/api",
"//pkg/timer/metrics",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/logutil",
"//pkg/util/timeutil",
"@com_github_google_uuid//:uuid",
Expand Down
11 changes: 11 additions & 0 deletions pkg/timer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/pkg/timer/api"
"github.com/pingcap/tidb/pkg/timer/metrics"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand All @@ -41,6 +42,16 @@ var (
checkWaitCloseTimerInterval = 10 * time.Second
)

func init() {
if intest.InTest {
// minTriggerEventInterval and batchProcessWatchRespInterval are used to
// forbid the event trigger too fast to exhaust the CPU.
// In the test environment, we can set them to a smaller value to speed up the test.
minTriggerEventInterval = time.Millisecond
batchProcessWatchRespInterval = time.Millisecond
}
}

var idleWatchChan = make(api.WatchTimerChan)

// TimerRuntimeBuilder is used to TimerRuntimeBuilder
Expand Down
6 changes: 6 additions & 0 deletions pkg/timer/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ func TestHandleHookWorkerResponse(t *testing.T) {
}

func TestNextTryTriggerDuration(t *testing.T) {
origMinTriggerEventInterval := minTriggerEventInterval
minTriggerEventInterval = time.Second
defer func() {
minTriggerEventInterval = origMinTriggerEventInterval
}()

now := time.Now()
store := api.NewMemoryTimerStore()
defer store.Close()
Expand Down
1 change: 1 addition & 0 deletions pkg/ttl/client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ddl/util",
"//pkg/util/intest",
"//pkg/util/logutil",
"@com_github_google_uuid//:uuid",
"@com_github_pingcap_errors//:errors",
Expand Down
56 changes: 32 additions & 24 deletions pkg/ttl/client/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
Expand Down Expand Up @@ -321,27 +322,17 @@ func (c *mockClient) Command(ctx context.Context, cmdType string, request any, r
ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(ttlCmdKeyLeaseSeconds))
defer cancel()

reqID, err := c.sendCmd(ctx, cmdType, request)
reqID, respCh, err := c.sendCmd(ctx, cmdType, request)
if err != nil {
return reqID, err
}

responseKey := ttlCmdKeyResponsePrefix + reqID
for ctx.Err() == nil {
time.Sleep(time.Second)
c.Lock()
val, ok := c.store[responseKey]
c.Unlock()

if !ok {
continue
}

res, ok := val.(*cmdResponse)
select {
case res, ok := <-respCh:
intest.Assert(ok, "response channel should not be closed")
if !ok {
return reqID, errors.New("response cannot be casted to *cmdResponse")
return "", errors.New("response channel is closed")
}

if res.ErrorMessage != "" {
return reqID, errors.New(res.ErrorMessage)
}
Expand All @@ -350,15 +341,16 @@ func (c *mockClient) Command(ctx context.Context, cmdType string, request any, r
return reqID, err
}
return reqID, nil
case <-ctx.Done():
return reqID, ctx.Err()
}
return reqID, ctx.Err()
}

func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request any) (string, error) {
func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request any) (string, chan *cmdResponse, error) {
reqID := uuid.New().String()
data, err := json.Marshal(request)
if err != nil {
return reqID, err
return reqID, nil, err
}

req := &CmdRequest{
Expand All @@ -369,18 +361,22 @@ func (c *mockClient) sendCmd(ctx context.Context, cmdType string, request any) (

c.Lock()
defer c.Unlock()
key := ttlCmdKeyRequestPrefix + reqID
c.store[key] = req
reqKey := ttlCmdKeyRequestPrefix + reqID
c.store[reqKey] = req
respKey := ttlCmdKeyResponsePrefix + reqID
respCh := make(chan *cmdResponse, 1)
c.store[respKey] = respCh
for _, ch := range c.commandWatchers {
select {
case <-ctx.Done():
return reqID, ctx.Err()
return reqID, nil, ctx.Err()
case ch <- req:
default:
return reqID, errors.New("watcher channel is blocked")
intest.Assert(false, "watcher channel should not be blocked")
return reqID, nil, errors.New("watcher channel is blocked")
}
}
return reqID, nil
return reqID, respCh, nil
}

// TakeCommand implements the CommandClient
Expand Down Expand Up @@ -414,7 +410,19 @@ func (c *mockClient) ResponseCommand(_ context.Context, reqID string, obj any) e
resp.Data = jsonData
}

c.store[ttlCmdKeyResponsePrefix+reqID] = resp
respKey := ttlCmdKeyResponsePrefix + reqID
item, ok := c.store[respKey]
if !ok {
return errors.New("response key not found for: " + reqID)
}
delete(c.store, respKey)
respCh := item.(chan *cmdResponse)
select {
case item.(chan *cmdResponse) <- resp:
close(respCh)
default:
intest.Assert(false, "response channel should not be blocked")
}
return nil
}

Expand Down
28 changes: 28 additions & 0 deletions pkg/ttl/ttlworker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ const taskManagerLoopTickerInterval = time.Minute
const ttlTaskHeartBeatTickerInterval = time.Minute
const ttlGCInterval = time.Hour

func getCheckJobInterval() time.Duration {
failpoint.Inject("check-job-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return jobManagerLoopTickerInterval
}

func getJobManagerLoopSyncTimerInterval() time.Duration {
failpoint.Inject("sync-timer", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return time.Second
}

func getUpdateInfoSchemaCacheInterval() time.Duration {
failpoint.Inject("update-info-schema-cache-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
Expand All @@ -55,6 +69,13 @@ func getResizeWorkersInterval() time.Duration {
return resizeWorkersInterval
}

func getTaskManagerLoopCheckTaskInterval() time.Duration {
failpoint.Inject("check-task-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return time.Second * 5
}

func getTaskManagerLoopTickerInterval() time.Duration {
failpoint.Inject("task-manager-loop-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
Expand All @@ -68,3 +89,10 @@ func getTaskManagerHeartBeatExpireInterval() time.Duration {
})
return 2 * ttlTaskHeartBeatTickerInterval
}

func getCheckJobTriggeredInterval() time.Duration {
failpoint.Inject("check-job-triggered-interval", func(val failpoint.Value) time.Duration {
return time.Duration(val.(int))
})
return 2 * time.Second
}
30 changes: 21 additions & 9 deletions pkg/ttl/ttlworker/job_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,14 +181,14 @@ func (m *JobManager) jobLoop() error {
resizeWorkersTicker := time.Tick(getResizeWorkersInterval())
gcTicker := time.Tick(ttlGCInterval)

scheduleJobTicker := time.Tick(jobManagerLoopTickerInterval)
jobCheckTicker := time.Tick(jobManagerLoopTickerInterval)
scheduleJobTicker := time.Tick(getCheckJobInterval())
jobCheckTicker := time.Tick(getCheckJobInterval())
updateJobHeartBeatTicker := time.Tick(jobManagerLoopTickerInterval)
timerTicker := time.Tick(time.Second)
timerTicker := time.Tick(getJobManagerLoopSyncTimerInterval())

scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval())
updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval)
taskCheckTicker := time.Tick(time.Second * 5)
taskCheckTicker := time.Tick(getTaskManagerLoopCheckTaskInterval())
checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval())

cmdWatcher := m.cmdCli.WatchCommand(m.ctx)
Expand Down Expand Up @@ -297,7 +297,13 @@ func (m *JobManager) onTimerTick(se session.Session, rt *ttlTimerRuntime, syncer
rt.Resume()
lastSyncTime, lastSyncVer := syncer.GetLastSyncInfo()
sinceLastSync := now.Sub(lastSyncTime)
if sinceLastSync < 5*time.Second {
minSyncDuration := 5 * time.Second
if intest.InTest {
// in test, we can set the minSyncDuration to 1ms to boost the test speed
minSyncDuration = time.Millisecond
}

if sinceLastSync < minSyncDuration {
// limit timer sync frequency by every 5 seconds
return
}
Expand Down Expand Up @@ -419,7 +425,7 @@ func (m *JobManager) triggerTTLJob(requestID string, cmd *client.TriggerNewTTLJo

go func() {
defer cancel()
ticker := time.NewTicker(2 * time.Second)
ticker := time.NewTicker(getCheckJobTriggeredInterval())
defer ticker.Stop()
loop:
for {
Expand Down Expand Up @@ -716,9 +722,15 @@ func (m *JobManager) couldLockJob(tableStatus *cache.TableStatus, table *cache.P
if tableStatus.CurrentJobOwnerID != "" {
// see whether it's heart beat time is expired
hbTime := tableStatus.CurrentJobOwnerHBTime
// a more concrete value is `2 * max(updateTTLTableStatusCacheInterval, jobManagerLoopTickerInterval)`, but the
// `updateTTLTableStatusCacheInterval` is greater than `jobManagerLoopTickerInterval` in most cases.
if hbTime.Add(2 * getUpdateTTLTableStatusCacheInterval()).Before(now) {
// jobManagerLoopTickerInterval is used to do heartbeat periodically.
// Use twice the time to detect the heartbeat timeout.
hbTimeout := jobManagerLoopTickerInterval * 2
if interval := getUpdateTTLTableStatusCacheInterval() * 2; interval > hbTimeout {
// tableStatus is get from the cache which may contain stale data.
// So if cache update interval > heartbeat interval, use the cache update interval instead.
hbTimeout = interval
}
if hbTime.Add(hbTimeout).Before(now) {
logutil.Logger(m.ctx).Info("task heartbeat has stopped", zap.Int64("tableID", table.ID), zap.Time("hbTime", hbTime), zap.Time("now", now))
return true
}
Expand Down
Loading

0 comments on commit 73584bb

Please sign in to comment.