-
Notifications
You must be signed in to change notification settings - Fork 25
/
ttl.go
117 lines (97 loc) · 1.8 KB
/
ttl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package CouloyDB
import (
"github.com/Kirov7/CouloyDB/public/ds"
"log"
"sync"
"sync/atomic"
"time"
)
type ttl struct {
mu *sync.RWMutex
started *atomic.Bool
eventCh chan struct{}
timeHeap *ds.TimeHeap
deleter func(key string) error
}
func newTTL(deleter func(key string) error) *ttl {
return &ttl{
mu: &sync.RWMutex{},
started: &atomic.Bool{},
eventCh: make(chan struct{}),
timeHeap: ds.NewTimeHeap(),
deleter: deleter,
}
}
func (ttl *ttl) add(job *ds.Job) {
ttl.mu.Lock()
ttl.timeHeap.Push(job)
ttl.mu.Unlock()
ttl.notify()
}
func (ttl *ttl) del(key string) {
ttl.mu.Lock()
ttl.timeHeap.Remove(key)
ttl.mu.Unlock()
ttl.notify()
}
func (ttl *ttl) isExpired(key string) bool {
ttl.mu.RLock()
defer ttl.mu.RUnlock()
job := ttl.timeHeap.Get(key)
return job != nil && !job.Expiration.After(time.Now())
}
func (ttl *ttl) start() {
ttl.started.Store(true)
for {
if !ttl.started.Load() {
break
}
ttl.exec()
}
}
func (ttl *ttl) stop() {
ttl.started.Store(false)
ttl.mu.Lock()
close(ttl.eventCh)
ttl.mu.Unlock()
}
const MaxDuration time.Duration = 1<<63 - 1
func (ttl *ttl) exec() {
now := time.Now()
duration := MaxDuration
ttl.mu.RLock()
job := ttl.timeHeap.Peek()
ttl.mu.RUnlock()
if job != nil {
if job.Expiration.After(now) {
duration = job.Expiration.Sub(now)
} else {
duration = 0
}
}
if duration > 0 {
timer := time.NewTimer(duration)
defer timer.Stop()
select {
case <-ttl.eventCh:
return
case <-timer.C:
}
}
ttl.mu.Lock()
job = ttl.timeHeap.Pop()
ttl.mu.Unlock()
if job == nil {
return
}
go func() {
if err := ttl.deleter(job.Key); err != nil {
log.Printf("there is a error occured by deleter: %v", err.Error())
}
}()
}
func (ttl *ttl) notify() {
if ttl.started.Load() {
ttl.eventCh <- struct{}{}
}
}