Skip to content

Commit

Permalink
timer: make sure timer which has a max delay will be scheduled first (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Nov 6, 2024
1 parent 849dcc0 commit 4c32fcc
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/timer/runtime/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ go_test(
embed = [":runtime"],
flaky = True,
race = "on",
shard_count = 23,
shard_count = 24,
deps = [
"//pkg/testkit/testsetup",
"//pkg/timer/api",
Expand Down
53 changes: 45 additions & 8 deletions pkg/timer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"fmt"
"maps"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -241,9 +242,7 @@ func (rt *TimerGroupRuntime) fullRefreshTimers() {

func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
now := rt.nowFunc()
var retryTimerIDs []string
var retryTimerKeys []string
var busyWorkers map[string]struct{}
var readyTimers []*timerCacheItem
rt.cache.iterTryTriggerTimers(func(timer *api.TimerRecord, tryTriggerTime time.Time, nextEventTime *time.Time) bool {
if tryTriggerTime.After(now) {
return false
Expand All @@ -253,9 +252,45 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {
return true
}

if readyTimers == nil {
readyTimers = make([]*timerCacheItem, 0, 8)
}

readyTimers = append(readyTimers, &timerCacheItem{
timer: timer,
nextEventTime: nextEventTime,
})
return true
})

if len(readyTimers) == 0 {
return
}

// resort timer to make sure the timer has the smallest nextEventTime has a higher priority to trigger
slices.SortFunc(readyTimers, func(a, b *timerCacheItem) int {
if a.nextEventTime == nil || b.nextEventTime == nil {
if a.nextEventTime != nil {
return 1
}

if b.nextEventTime != nil {
return -1
}

return 0
}
return a.nextEventTime.Compare(*b.nextEventTime)
})

var retryTimerIDs []string
var retryTimerKeys []string
var busyWorkers map[string]struct{}
for i, item := range readyTimers {
timer := item.timer
worker, ok := rt.ensureWorker(timer.HookClass)
if !ok {
return true
continue
}

eventID := timer.EventID
Expand All @@ -273,20 +308,22 @@ func (rt *TimerGroupRuntime) tryTriggerTimerEvents() {

select {
case <-rt.ctx.Done():
return false
return
case worker.ch <- req:
rt.cache.setTimerProcStatus(timer.ID, procTriggering, eventID)
default:
if busyWorkers == nil {
busyWorkers = make(map[string]struct{})
busySize := len(readyTimers) - i
retryTimerIDs = make([]string, 0, busySize)
retryTimerKeys = make([]string, 0, busySize)
busyWorkers = make(map[string]struct{}, busySize)
}

busyWorkers[timer.HookClass] = struct{}{}
retryTimerIDs = append(retryTimerIDs, timer.ID)
retryTimerKeys = append(retryTimerKeys, fmt.Sprintf("[%s] %s", timer.Namespace, timer.Key))
}
return true
})
}

if len(retryTimerIDs) > 0 {
busyWorkerList := make([]string, 0, len(busyWorkers))
Expand Down
45 changes: 45 additions & 0 deletions pkg/timer/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,51 @@ func TestTryTriggerTimer(t *testing.T) {
consumeAndVerify(t3)
}

func TestTryTriggerTimePriority(t *testing.T) {
now := time.Now()
store := api.NewMemoryTimerStore()
defer store.Close()
runtime := NewTimerRuntimeBuilder("g1", store).Build()
runtime.setNowFunc(func() time.Time {
return now
})
runtime.initCtx()
ch := make(chan *triggerEventRequest, 2)
runtime.workers["hook1"] = &hookWorker{ch: ch}

t1 := newTestTimer("t1", "1m", now.Add(-time.Hour))
runtime.cache.updateTimer(t1)
runtime.cache.updateNextTryTriggerTime(t1.ID, now.Add(-3*time.Minute))

t2 := newTestTimer("t2", "1m", now.Add(-2*time.Hour))
runtime.cache.updateTimer(t2)
runtime.cache.updateNextTryTriggerTime(t2.ID, now.Add(-2*time.Minute))

t3 := newTestTimer("t3", "1h", now)
t3.EventStatus = api.SchedEventTrigger
t3.EventID = "event2"
t3.EventStart = now.Add(-time.Minute)
t3.Enable = false
runtime.cache.updateTimer(t3)

t4 := newTestTimer("t4", "1m", now.Add(-10*time.Hour))
runtime.cache.updateTimer(t4)
runtime.cache.updateNextTryTriggerTime(t4.ID, now.Add(time.Minute))

// nextEventTime: t3 (nil) < t4 < t2 < t1
// nextTryTriggerTime: t1 < t2 < t3 (eventStart) < t4
// we should test the priority trigger is ordered by `nextEventTime` because to ensure the timer who has a max
// delay time will be triggered first.
// t4 should not be scheduled for the next trigger time is after now.
// so, t3 and t2 will be triggered when the capacity of chan is 2
runtime.tryTriggerTimerEvents()
require.Equal(t, procTriggering, runtime.cache.items[t2.ID].procStatus)
require.Equal(t, procTriggering, runtime.cache.items[t3.ID].procStatus)
// t1, t4 should keep not triggered
require.Equal(t, procIdle, runtime.cache.items[t1.ID].procStatus)
require.Equal(t, procIdle, runtime.cache.items[t4.ID].procStatus)
}

func TestHandleHookWorkerResponse(t *testing.T) {
now := time.Now()
store := api.NewMemoryTimerStore()
Expand Down

0 comments on commit 4c32fcc

Please sign in to comment.