Skip to content

Commit

Permalink
feat: throttling (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula authored Apr 13, 2023
1 parent 940da1a commit 2423d26
Show file tree
Hide file tree
Showing 14 changed files with 1,568 additions and 8 deletions.
131 changes: 131 additions & 0 deletions cachettl/cachettl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package cachettl

import (
"sync"
"time"
)

// Cache is a double linked list sorted by expiration time (ascending order)
// the root (head) node is the node with the lowest expiration time
// the tail node (end) is the node with the highest expiration time
// Cleanups are done on Get() calls so if Get() is never invoked then Nodes stay in-memory.
type Cache[K comparable, V any] struct {
root *node[K, V]
mu sync.Mutex
m map[K]*node[K, V]
now func() time.Time
}

type node[K comparable, V any] struct {
key K
value V
prev *node[K, V]
next *node[K, V]
ttl time.Duration
expiration time.Time
}

func (n *node[K, V]) remove() {
n.prev.next = n.next
n.next.prev = n.prev
}

// New returns a new Cache.
func New[K comparable, V any]() *Cache[K, V] {
return &Cache[K, V]{
now: time.Now,
root: &node[K, V]{},
m: make(map[K]*node[K, V]),
}
}

// Get returns the value associated with the key or nil otherwise.
// Additionally, Get() will refresh the TTL and cleanup expired nodes.
func (c *Cache[K, V]) Get(key K) (zero V) {
c.mu.Lock()
defer c.mu.Unlock()

defer func() { // remove expired nodes
cn := c.root.next // start from head since we're sorting by expiration with the highest expiration at the tail
for cn != nil && cn != c.root {
if c.now().After(cn.expiration) {
cn.remove() // removes a node from the linked list (leaves the map untouched)
delete(c.m, cn.key) // remove node from map too
} else { // there is nothing else to clean up, no need to iterate further
break
}
cn = cn.next
}
}()

if n, ok := c.m[key]; ok && n.expiration.After(c.now()) {
n.remove()
n.expiration = c.now().Add(n.ttl) // refresh TTL
c.add(n)
return n.value
}
return zero
}

// Put adds or updates an element inside the Cache.
// The Cache will be sorted with the node with the highest expiration at the tail.
func (c *Cache[K, V]) Put(key K, value V, ttl time.Duration) {
c.mu.Lock()
defer c.mu.Unlock()

now := c.now()

n, ok := c.m[key]
if !ok {
n = &node[K, V]{
key: key, value: value, ttl: ttl, expiration: now.Add(ttl),
}
c.m[key] = n
} else {
n.value = value
n.expiration = now.Add(ttl)
}

if c.root.next == nil { // first node insertion
c.root.next = n
c.root.prev = n
n.prev = c.root
n.next = c.root
return
}

if ok { // removes a node from the linked list (leaves the map untouched)
n.remove()
}

c.add(n)
}

func (c *Cache[K, V]) add(n *node[K, V]) {
cn := c.root.prev // tail
for cn != nil { // iterate from tail to root because we have expiring nodes towards the tail
if n.expiration.After(cn.expiration) || n.expiration.Equal(cn.expiration) {
// insert node after cn
save := cn.next
cn.next = n
n.prev = cn
n.next = save
save.prev = n
break
}
cn = cn.prev
}
}

// slice is used for debugging purposes only
func (c *Cache[K, V]) slice() (s []V) {
c.mu.Lock()
defer c.mu.Unlock()

cn := c.root.next
for cn != nil && cn != c.root {
s = append(s, cn.value)
cn = cn.next
}
return
}
88 changes: 88 additions & 0 deletions cachettl/cachettl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cachettl

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestCacheTTL(t *testing.T) {
now := time.Now()

c := New[string, string]()
c.now = func() time.Time { return now }

// nothing done so far, we expect the cache to be empty
require.Nil(t, c.slice())

// insert the very first value
c.Put("two", "222", 2)
require.Equal(t, []string{"222"}, c.slice())

// insert the second value with an expiration higher than the first one
c.Put("three", "333", 3)
require.Equal(t, []string{"222", "333"}, c.slice())

// insert the third value with an expiration lower than all other values
c.Put("one", "111", 1)
require.Equal(t, []string{"111", "222", "333"}, c.slice())

// update "111" to have a higher expiration than all values
c.Put("one", "111", 4)
require.Equal(t, []string{"222", "333", "111"}, c.slice())

// update "333" to have a higher expiration than all values
c.Put("three", "333", 5)
require.Equal(t, []string{"222", "111", "333"}, c.slice())

// move time forward to expire "222"
c.now = func() time.Time { return now.Add(1) } // "222" should still be there
require.Empty(t, c.Get("whatever")) // trigger the cleanup
require.Equal(t, []string{"222", "111", "333"}, c.slice())

c.now = func() time.Time { return now.Add(2) } // "222" should still be there
require.Empty(t, c.Get("whatever")) // trigger the cleanup
require.Equal(t, []string{"222", "111", "333"}, c.slice())

c.now = func() time.Time { return now.Add(3) } // "222" should be expired!
require.Empty(t, c.Get("whatever")) // trigger the cleanup
require.Equal(t, []string{"111", "333"}, c.slice())

// let's move a lot forward to expire everything
c.now = func() time.Time { return now.Add(6) }
require.Empty(t, c.Get("whatever")) // trigger the cleanup
require.Nil(t, c.slice())
require.Len(t, c.m, 0)

// now let's set a key, then move forward and get it directly without triggering with a different key
c.now = func() time.Time { return now }
c.Put("last", "999", 1)
require.Equal(t, "999", c.Get("last"))
require.Equal(t, []string{"999"}, c.slice())
c.now = func() time.Time { return now.Add(2) }
require.Empty(t, c.Get("last")) // trigger the cleanup
require.Nil(t, c.slice())
require.Len(t, c.m, 0)
}

func TestRefreshTTL(t *testing.T) {
c := New[string, string]()

// nothing done so far, we expect the cache to be empty
require.Nil(t, c.slice())

c.Put("one", "111", time.Second)
c.Put("two", "222", time.Second)
c.Put("three", "333", time.Second)
require.Equal(t, []string{"111", "222", "333"}, c.slice())

require.Equal(t, "111", c.Get("one"))
require.Equal(t, []string{"222", "333", "111"}, c.slice())

require.Equal(t, "222", c.Get("two"))
require.Equal(t, []string{"333", "111", "222"}, c.slice())

require.Equal(t, "333", c.Get("three"))
require.Equal(t, []string{"111", "222", "333"}, c.slice())
}
9 changes: 7 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/cenkalti/backoff/v4 v4.2.0
github.com/fsnotify/fsnotify v1.6.0
github.com/go-chi/chi/v5 v5.0.8
github.com/go-redis/redis/v8 v8.11.5
github.com/golang/mock v1.6.0
github.com/gorilla/mux v1.8.0
github.com/joho/godotenv v1.5.1
Expand All @@ -18,6 +19,7 @@ require (
github.com/spf13/cast v1.5.0
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.2
github.com/throttled/throttled/v2 v2.11.0
github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04
go.opentelemetry.io/otel v1.14.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.37.0
Expand All @@ -41,6 +43,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/docker/cli v20.10.14+incompatible // indirect
github.com/docker/docker v20.10.24+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
Expand All @@ -51,12 +54,14 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae // indirect
github.com/onsi/gomega v1.27.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc2 // indirect
github.com/opencontainers/runc v1.1.5 // indirect
Expand All @@ -78,11 +83,11 @@ require (
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.7.0 // indirect
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f // indirect
google.golang.org/genproto v0.0.0-20230216225411-c8e22ba71e44 // indirect
google.golang.org/grpc v1.53.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
Loading

0 comments on commit 2423d26

Please sign in to comment.