-
Notifications
You must be signed in to change notification settings - Fork 1
/
pool.go
142 lines (128 loc) · 2.71 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package pool
import (
"errors"
"fmt"
"sync"
"time"
)
var (
ErrInvalidConfig = errors.New("invalid pool config")
ErrPoolClosed = errors.New("pool closed")
)
type Poolable interface {
Close() error
GetActiveTime() time.Time
}
type factory func() (Poolable, error)
type Pool interface {
Get() (Poolable, error) // 获取资源
Put(Poolable) error // 释放资源
Len() int //资源数量
Close(Poolable) error // 关闭资源
Shutdown() error // 关闭池
}
type GenericPool struct {
sync.Mutex
pool chan Poolable
maxOpen int // 池中最大资源数
numOpen int // 当前池中资源数
minOpen int // 池中最少资源数
closed bool // 池是否已关闭
maxLifetime time.Duration
factory factory // 创建连接的方法
}
func NewGenericPool(minOpen, maxOpen int, maxLifetime time.Duration, factory factory) (*GenericPool, error) {
if maxOpen <= 0 || minOpen > maxOpen {
return nil, ErrInvalidConfig
}
p := &GenericPool{
maxOpen: maxOpen,
minOpen: minOpen,
maxLifetime: maxLifetime,
factory: factory,
pool: make(chan Poolable, maxOpen),
}
for i := 0; i < minOpen; i++ {
closer, err := factory()
if err != nil {
continue
}
p.numOpen++
p.pool <- closer
}
return p, nil
}
func (p *GenericPool) Get() (Poolable, error) {
if p.closed {
return nil, ErrPoolClosed
}
for {
closer, err := p.getOrCreate()
if err != nil {
return nil, err
}
// 如果设置了超时且当前连接的活跃时间+超时时间早于现在,则当前连接已过期
if p.maxLifetime > 0 && closer.GetActiveTime().Add(p.maxLifetime).Before(time.Now()) {
fmt.Println("time out close")
_ = p.Close(closer)
continue
}
return closer, nil
}
}
func (p *GenericPool) getOrCreate() (Poolable, error) {
select {
case closer := <-p.pool:
return closer, nil
default:
}
p.Lock()
if p.numOpen >= p.maxOpen {
closer := <-p.pool
p.Unlock()
return closer, nil
}
// 新建连接
closer, err := p.factory()
if err != nil {
p.Unlock()
return nil, err
}
p.numOpen++
p.Unlock()
return closer, nil
}
// 释放单个资源到连接池
func (p *GenericPool) Put(closer Poolable) error {
if p.closed {
return ErrPoolClosed
}
p.pool <- closer
return nil
}
func (p *GenericPool) Len() int {
return p.numOpen
}
// 关闭单个资源
func (p *GenericPool) Close(closer Poolable) error {
p.Lock()
_ = closer.Close()
p.numOpen--
p.Unlock()
return nil
}
// 关闭连接池,释放所有资源
func (p *GenericPool) Shutdown() error {
if p.closed {
return ErrPoolClosed
}
p.Lock()
close(p.pool)
for closer := range p.pool {
_ = closer.Close()
p.numOpen--
}
p.closed = true
p.Unlock()
return nil
}