-
Notifications
You must be signed in to change notification settings - Fork 0
/
replay.go
440 lines (396 loc) · 10.9 KB
/
replay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
package replay
import (
"bytes"
"container/list"
"fmt"
"io"
"log"
"net/http"
"strings"
"sync"
"time"
)
/*
Configurable http caching middleware for Go servers.
*/
// Cache stores the cache configuration and data
type Cache struct {
cacheList *list.List // Linked list containing the cache data
metrics *CacheMetrics // Metrics for cache performance
*sync.Mutex // Mutex for synchronizing access to the cache
CacheConfig // Cache configuration options
}
// CacheConfig stores the configuration options for the cache
// All of these options can be set using CacheOption functions
type CacheConfig struct {
maxSize int // Maximum number of entries in the cache
maxMemory uint64 // Maximum memory usage for the cache
evictionPolicy EvictionPolicy // Cache eviction policy (LRU, FIFO)
evictionTimer time.Duration // Time interval for checking expired entries
ttl time.Duration // Time-To-Live for cache entries
maxTtl time.Duration // Maximum lifespan for cache entries
cacheFilters []string // Filters used for cache key generation
cacheFailures bool // Allow caching for failed requests
l *log.Logger // Logger for cache logging
}
// CacheEntry stores a single cache item
type CacheEntry struct {
key string
value *CacheResponse
size uint64
created time.Time
lastAccessed time.Time
}
// CacheResponse stores the response info to be cached
type CacheResponse struct {
StatusCode int
Header http.Header
Body []byte
}
// CacheMetrics stores the cache metrics
type CacheMetrics struct {
Hits int
Misses int
Evictions int
CurrentSize int
CurrentMemory uint64
}
const (
DefaultMaxSize = 25
DefaultMaxMemory = 100 * 1024 * 1024
DefaultEvictionPolicy = LRU
DefaultEvictionTimer = 1 * time.Minute
DefaultTTL = 5 * time.Minute
DefaultMaxTTL = 10 * time.Minute
DefaultFilter = "URL"
)
type EvictionPolicy string
const (
LRU EvictionPolicy = "LRU"
FIFO EvictionPolicy = "FIFO"
)
type CacheOption func(*Cache)
// NewCache initializes a new instance of Cache with given options
func NewCache(options ...CacheOption) *Cache {
c := &Cache{
CacheConfig: CacheConfig{
maxSize: DefaultMaxSize,
maxMemory: DefaultMaxMemory,
evictionPolicy: DefaultEvictionPolicy,
evictionTimer: DefaultEvictionTimer,
ttl: DefaultTTL,
maxTtl: DefaultMaxTTL,
cacheFilters: []string{DefaultFilter},
cacheFailures: false,
l: log.New(io.Discard, "", 0),
},
cacheList: list.New(),
metrics: &CacheMetrics{},
Mutex: &sync.Mutex{},
}
for _, option := range options {
option(c)
}
go c.clearExpiredEntries()
return c
}
// Set the maximum number of entries in the cache
func WithMaxSize(maxSize int) CacheOption {
return func(c *Cache) {
if maxSize != 0 {
c.maxSize = maxSize
}
}
}
// Set the maximum memory usage of the cache
func WithMaxMemory(maxMemory uint64) CacheOption {
return func(c *Cache) {
if maxMemory != 0 {
c.maxMemory = maxMemory
}
}
}
// Set the eviction policy for the cache [FIFO, LRU]
func WithEvictionPolicy(evictionPolicy string) CacheOption {
return func(c *Cache) {
if evictionPolicy != "" {
c.evictionPolicy = EvictionPolicy(evictionPolicy)
}
}
}
// Set the time between cache eviction checks
func WithEvictionTimer(evictionTimer time.Duration) CacheOption {
return func(c *Cache) {
if evictionTimer > time.Minute {
c.evictionTimer = evictionTimer
}
}
}
// Set the time a cache entry can live without being accessed
func WithTTL(ttl time.Duration) CacheOption {
return func(c *Cache) {
if ttl > 0 {
c.ttl = ttl
}
}
}
// Set the maximum time a cache entry can live, including renewals
func WithMaxTTL(maxTtl time.Duration) CacheOption {
return func(c *Cache) {
if maxTtl > c.ttl {
c.maxTtl = maxTtl
}
}
}
// Set the cache filters to use for generating cache keys
func WithCacheFilters(cacheFilters []string) CacheOption {
return func(c *Cache) {
if len(cacheFilters) != 0 {
c.cacheFilters = cacheFilters
}
}
}
// Set whether to cache failed requests
func WithCacheFailures(cacheFailures bool) CacheOption {
return func(c *Cache) {
c.cacheFailures = cacheFailures
}
}
// Set the logger to use for cache logging
func WithLogger(l *log.Logger) CacheOption {
return func(c *Cache) {
c.l = l
}
}
// Middleware function to intercept all HTTP requests on a handler.
func (c *Cache) Middleware(next http.Handler) http.Handler {
return c.middleware(next.(http.HandlerFunc))
}
// Middleware function to intercept HTTP requests to a given route
func (c *Cache) MiddlewareFunc(next http.HandlerFunc) http.HandlerFunc {
return c.middleware(next)
}
func (c *Cache) middleware(next http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
key := c.generateKey(r)
wasCached := false
defer func() {
if wasCached {
c.metrics.Hits++
} else {
c.metrics.Misses++
}
c.l.Printf("Request: %s, Cached: %v, Duration: %v Status: ", key, wasCached, time.Since(start))
}()
c.Lock()
defer c.Unlock()
if ele, found := c.findKey(key); found {
entry := ele.Value.(*CacheEntry)
if entry.lastAccessed.Add(c.ttl).After(time.Now()) {
// valid, not expired
c.l.Printf("Serving from cache: %s", key)
c.serveFromCache(w, entry)
wasCached = true
return
}
// accessed expired item, remove the item
c.l.Printf("Cache entry expired: %s, removing from cache", key)
c.cacheList.Remove(ele)
} else {
c.l.Printf("Cache miss: %s", key)
}
// not in cache or expired, serve then cache the response
wr := &writerRecorder{ResponseWriter: w, statusCode: http.StatusOK}
next.ServeHTTP(wr, r)
if c.cacheFailures || wr.statusCode == http.StatusOK {
go c.addToCache(key, wr.Result())
}
})
}
func (c *Cache) Metrics() *CacheMetrics {
c.Lock()
defer c.Unlock()
return &CacheMetrics{
Hits: c.metrics.Hits,
Misses: c.metrics.Misses,
Evictions: c.metrics.Evictions,
CurrentSize: c.cacheList.Len(),
CurrentMemory: calculateCacheMemory(c),
}
}
// clear any expired cache entries
func (c *Cache) clearExpiredEntries() {
timer := time.NewTicker(c.evictionTimer)
for range timer.C {
func() {
c.Lock()
defer c.Unlock()
for ele := c.cacheList.Front(); ele != nil; ele = ele.Next() {
entry := ele.Value.(*CacheEntry)
if entry.lastAccessed.Add(c.ttl).Before(time.Now()) ||
entry.created.Add(c.maxTtl).Before(time.Now()) {
c.l.Printf("Cache entry expired: %s, removing from cache", entry.key)
c.cacheList.Remove(ele)
}
}
}()
}
}
// generate cache key based on request + selected filters
func (c *Cache) generateKey(r *http.Request) string {
keyParts := []string{r.URL.String()}
for _, filter := range c.cacheFilters {
switch filter {
case "Method":
keyParts = append(keyParts, r.Method)
case "Header":
for k, v := range r.Header {
for _, hv := range v {
keyParts = append(keyParts, fmt.Sprintf("%s=%s", k, hv))
}
}
}
}
return strings.Join(keyParts, "|")
}
// find a cache entry in the list based on key
func (c *Cache) findKey(key string) (*list.Element, bool) {
for ele := c.cacheList.Front(); ele != nil; ele = ele.Next() {
entry := ele.Value.(*CacheEntry)
if entry.key == key {
return ele, true
}
}
return nil, false
}
// serve the response from cache
func (c *Cache) serveFromCache(w http.ResponseWriter, entry *CacheEntry) {
entry.lastAccessed = time.Now()
for k, v := range entry.value.Header {
for _, hv := range v {
w.Header().Add(k, hv)
}
}
w.WriteHeader(entry.value.StatusCode)
w.Write(entry.value.Body)
}
// add a new response to the cache
func (c *Cache) addToCache(key string, resp *http.Response) {
cacheResp, cacheSize := cloneResponse(resp)
entry := &CacheEntry{
key: key,
value: cacheResp,
size: cacheSize,
created: time.Now(),
lastAccessed: time.Now(),
}
if entry.size > c.maxMemory {
c.l.Printf("Response too large to cache: %s", key)
return
}
c.Lock()
defer c.Unlock()
c.cacheList.PushFront(entry)
c.checkEvictions()
}
// stay in bounds of cache size and memory limits
func (c *Cache) checkEvictions() {
// Check the cache size
for c.cacheList.Len() >= c.maxSize {
c.l.Printf("Cache is full, evicting an item")
c.evict()
}
// Handle memory limit
for calculateCacheMemory(c) > c.maxMemory && c.cacheList.Len() != 0 {
c.l.Printf("Cache exceeds max memory, evicting an item")
c.evict()
}
}
func calculateCacheMemory(c *Cache) uint64 {
var currentMemory uint64
for ele := c.cacheList.Front(); ele != nil; ele = ele.Next() {
entry := ele.Value.(*CacheEntry)
currentMemory += entry.size
}
return currentMemory
}
// evict from cache based on policy
func (c *Cache) evict() {
var ele *list.Element
if c.evictionPolicy == "FIFO" {
ele = c.cacheList.Back()
} else if c.evictionPolicy == "LRU" {
var oldest time.Time
for e := c.cacheList.Front(); e != nil; e = e.Next() {
entry := e.Value.(*CacheEntry)
if oldest.IsZero() || entry.lastAccessed.Before(oldest) {
oldest = entry.lastAccessed
ele = e
}
}
}
if ele != nil {
entry := ele.Value.(*CacheEntry)
c.l.Printf("Evicting: %v", entry.key)
c.cacheList.Remove(ele)
c.metrics.Evictions++
}
}
// copy the response for caching
func cloneResponse(resp *http.Response) (*CacheResponse, uint64) {
var buf bytes.Buffer
if resp.Body != nil {
_, err := io.Copy(&buf, resp.Body)
if err != nil {
log.Printf("Error copying response body: %v", err)
}
resp.Body.Close()
}
cacheResp := &CacheResponse{
StatusCode: resp.StatusCode,
Header: resp.Header,
Body: buf.Bytes(),
}
return cacheResp, sizeOfResponse(cacheResp)
}
// calculate the size of the cache object
func sizeOfResponse(resp *CacheResponse) uint64 {
headerSize := uint64(0)
for k, v := range resp.Header {
headerSize += uint64(len(k) + len(v[0]))
}
return uint64(len(resp.Body)) + headerSize
}
// Capture our response on the fly, lets us cache it.
type writerRecorder struct {
http.ResponseWriter
statusCode int
headers http.Header
body io.ReadWriter
}
func (wr *writerRecorder) WriteHeader(statusCode int) {
wr.statusCode = statusCode
wr.ResponseWriter.WriteHeader(statusCode)
}
func (wr *writerRecorder) Write(body []byte) (int, error) {
if wr.body == nil {
wr.body = new(bytes.Buffer)
}
wr.body.Write(body)
return wr.ResponseWriter.Write(body)
}
func (wr *writerRecorder) Header() http.Header {
if wr.headers == nil {
wr.headers = make(http.Header)
}
return wr.headers
}
func (wr *writerRecorder) Result() *http.Response {
return &http.Response{
StatusCode: wr.statusCode,
Header: wr.headers,
Body: io.NopCloser(bytes.NewBuffer(wr.body.(*bytes.Buffer).Bytes())),
ContentLength: int64(wr.body.(*bytes.Buffer).Len()),
}
}