Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Testing adding some new apis #427

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 76 additions & 33 deletions cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@
// Metrics contains a running log of important statistics like hits, misses,
// and dropped items.
Metrics *Metrics
// Rate limiter for asynchronus set.
rateLimiter int64
// Max buffer items for asynchronus set.
maxItems int64
}

// Config is passed to NewCache for creating new Cache instances.
Expand Down Expand Up @@ -147,6 +151,9 @@
// as well as on rejection of the value.
OnExit func(val V)

// ShouldUpdate is called when a value already exists in cache and is being updated.
ShouldUpdate func(cur, prev V) bool

// KeyToHash function is used to customize the key hashing algorithm.
// Each key will be hashed using the provided function. If keyToHash value
// is not set, the default keyToHash function is used.
Expand Down Expand Up @@ -221,7 +228,7 @@
}
policy := newPolicy[V](config.NumCounters, config.MaxCost)
cache := &Cache[K, V]{
storedItems: newStore[V](),
storedItems: newStore[V](config.ShouldUpdate),
cachePolicy: policy,
getBuf: newRingBuffer(policy, config.BufferItems),
setBuf: make(chan *Item[V], setBufSize),
Expand All @@ -230,6 +237,8 @@
cost: config.Cost,
ignoreInternalCost: config.IgnoreInternalCost,
cleanupTicker: time.NewTicker(time.Duration(config.TtlTickerDurationInSec) * time.Second / 2),
maxItems: config.BufferItems,
rateLimiter: 0,
}
cache.onExit = func(val V) {
if config.OnExit != nil {
Expand Down Expand Up @@ -485,6 +494,71 @@
c.cachePolicy.UpdateMaxCost(maxCost)
}

func (c *Cache[K, V]) AsyncSet(key K, value V, cost int64) bool {
if atomic.AddInt64(&c.rateLimiter, 1) > c.maxItems {
atomic.AddInt64(&c.rateLimiter, -1) // Decrement on failure
return false
}

defer atomic.AddInt64(&c.rateLimiter, -1)

if c == nil || c.isClosed.Load() {
return false
}

Check failure on line 508 in cache.go

View workflow job for this annotation

GitHub Actions / lint

SA5011: possible nil pointer dereference (staticcheck)
var expiration time.Time
keyHash, conflictHash := c.keyToHash(key)

Check failure on line 510 in cache.go

View workflow job for this annotation

GitHub Actions / lint

SA5011(related information): this check suggests that the pointer can be nil (staticcheck)
i := &Item[V]{
flag: itemNew,
Key: keyHash,
Conflict: conflictHash,
Value: value,
Cost: cost,
Expiration: expiration,
}
c.insertItem(i, nil, nil)
return true
}

func (c *Cache[K, V]) insertItem(i *Item[V], trackAdmission func(uint64), onEvict func(*Item[V])) {
// Calculate item cost value if new or update.
if i.Cost == 0 && c.cost != nil && i.flag != itemDelete {
i.Cost = c.cost(i.Value)
}
if !c.ignoreInternalCost {
// Add the cost of internally storing the object.
i.Cost += itemSize
}

switch i.flag {
case itemNew:
victims, added := c.cachePolicy.Add(i.Key, i.Cost)
if added {
c.storedItems.Set(i)
c.Metrics.add(keyAdd, i.Key, 1)
if trackAdmission != nil {
trackAdmission(i.Key)
}
} else {
c.onReject(i)
}
for _, victim := range victims {
victim.Conflict, victim.Value = c.storedItems.Del(victim.Key, 0)
if onEvict != nil {
onEvict(victim)
}
}

case itemUpdate:
c.cachePolicy.Update(i.Key, i.Cost)

case itemDelete:
c.cachePolicy.Del(i.Key) // Deals with metrics updates.
_, val := c.storedItems.Del(i.Key, i.Conflict)
c.onExit(val)
}
}

// processItems is ran by goroutines processing the Set buffer.
func (c *Cache[K, V]) processItems() {
startTs := make(map[uint64]time.Time)
Expand Down Expand Up @@ -521,38 +595,7 @@
i.wg.Done()
continue
}
// Calculate item cost value if new or update.
if i.Cost == 0 && c.cost != nil && i.flag != itemDelete {
i.Cost = c.cost(i.Value)
}
if !c.ignoreInternalCost {
// Add the cost of internally storing the object.
i.Cost += itemSize
}

switch i.flag {
case itemNew:
victims, added := c.cachePolicy.Add(i.Key, i.Cost)
if added {
c.storedItems.Set(i)
c.Metrics.add(keyAdd, i.Key, 1)
trackAdmission(i.Key)
} else {
c.onReject(i)
}
for _, victim := range victims {
victim.Conflict, victim.Value = c.storedItems.Del(victim.Key, 0)
onEvict(victim)
}

case itemUpdate:
c.cachePolicy.Update(i.Key, i.Cost)

case itemDelete:
c.cachePolicy.Del(i.Key) // Deals with metrics updates.
_, val := c.storedItems.Del(i.Key, i.Conflict)
c.onExit(val)
}
c.insertItem(i, trackAdmission, onEvict)
case <-c.cleanupTicker.C:
c.storedItems.Cleanup(c.cachePolicy, onEvict)
case <-c.stop:
Expand Down
47 changes: 31 additions & 16 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"time"
)

type updateFn[V any] func(cur, prev V) bool

// TODO: Do we need this to be a separate struct from Item?
type storeItem[V any] struct {
key uint64
Expand Down Expand Up @@ -56,24 +58,31 @@ type store[V any] interface {
}

// newStore returns the default store implementation.
func newStore[V any]() store[V] {
return newShardedMap[V]()
func newStore[V any](f updateFn[V]) store[V] {
return newShardedMap[V](f)
}

const numShards uint64 = 256

type shardedMap[V any] struct {
shards []*lockedMap[V]
expiryMap *expirationMap[V]
shards []*lockedMap[V]
expiryMap *expirationMap[V]
shouldUpdate updateFn[V]
}

func newShardedMap[V any]() *shardedMap[V] {
func newShardedMap[V any](f updateFn[V]) *shardedMap[V] {
if f == nil {
f = func(cur, prev V) bool {
return true
}
}
sm := &shardedMap[V]{
shards: make([]*lockedMap[V], int(numShards)),
expiryMap: newExpirationMap[V](),
shards: make([]*lockedMap[V], int(numShards)),
expiryMap: newExpirationMap[V](),
shouldUpdate: f,
}
for i := range sm.shards {
sm.shards[i] = newLockedMap[V](sm.expiryMap)
sm.shards[i] = newLockedMap[V](sm.expiryMap, f)
}
return sm
}
Expand Down Expand Up @@ -116,14 +125,16 @@ func (sm *shardedMap[V]) Clear(onEvict func(item *Item[V])) {

type lockedMap[V any] struct {
sync.RWMutex
data map[uint64]storeItem[V]
em *expirationMap[V]
data map[uint64]storeItem[V]
em *expirationMap[V]
shouldUpdate updateFn[V]
}

func newLockedMap[V any](em *expirationMap[V]) *lockedMap[V] {
func newLockedMap[V any](em *expirationMap[V], f updateFn[V]) *lockedMap[V] {
return &lockedMap[V]{
data: make(map[uint64]storeItem[V]),
em: em,
data: make(map[uint64]storeItem[V]),
em: em,
shouldUpdate: f,
}
}

Expand Down Expand Up @@ -167,6 +178,9 @@ func (m *lockedMap[V]) Set(i *Item[V]) {
if i.Conflict != 0 && (i.Conflict != item.conflict) {
return
}
if !m.shouldUpdate(i.Value, item.value) {
return
}
m.em.update(i.Key, i.Conflict, item.expiration, i.Expiration)
} else {
// The value is not in the map already. There's no need to return anything.
Expand Down Expand Up @@ -205,15 +219,17 @@ func (m *lockedMap[V]) Del(key, conflict uint64) (uint64, V) {

func (m *lockedMap[V]) Update(newItem *Item[V]) (V, bool) {
m.Lock()
defer m.Unlock()
item, ok := m.data[newItem.Key]
if !ok {
m.Unlock()
return zeroValue[V](), false
}
if newItem.Conflict != 0 && (newItem.Conflict != item.conflict) {
m.Unlock()
return zeroValue[V](), false
}
if !m.shouldUpdate(newItem.Value, item.value) {
return item.value, false
}

m.em.update(newItem.Key, newItem.Conflict, item.expiration, newItem.Expiration)
m.data[newItem.Key] = storeItem[V]{
Expand All @@ -223,7 +239,6 @@ func (m *lockedMap[V]) Update(newItem *Item[V]) (V, bool) {
expiration: newItem.Expiration,
}

m.Unlock()
return item.value, true
}

Expand Down
18 changes: 9 additions & 9 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func TestStoreSetGet(t *testing.T) {
s := newStore[int]()
s := newStore[int](nil)
key, conflict := z.KeyToHash(1)
i := Item[int]{
Key: key,
Expand Down Expand Up @@ -40,7 +40,7 @@ func TestStoreSetGet(t *testing.T) {
}

func TestStoreDel(t *testing.T) {
s := newStore[int]()
s := newStore[int](nil)
key, conflict := z.KeyToHash(1)
i := Item[int]{
Key: key,
Expand All @@ -57,7 +57,7 @@ func TestStoreDel(t *testing.T) {
}

func TestStoreClear(t *testing.T) {
s := newStore[uint64]()
s := newStore[uint64](nil)
for i := uint64(0); i < 1000; i++ {
key, conflict := z.KeyToHash(i)
it := Item[uint64]{
Expand All @@ -77,7 +77,7 @@ func TestStoreClear(t *testing.T) {
}

func TestStoreUpdate(t *testing.T) {
s := newStore[int]()
s := newStore[int](nil)
key, conflict := z.KeyToHash(1)
i := Item[int]{
Key: key,
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestStoreUpdate(t *testing.T) {
}

func TestStoreCollision(t *testing.T) {
s := newShardedMap[int]()
s := newShardedMap[int](nil)
s.shards[1].Lock()
s.shards[1].data[1] = storeItem[int]{
key: 1,
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestStoreCollision(t *testing.T) {
}

func TestStoreExpiration(t *testing.T) {
s := newStore[int]()
s := newStore[int](nil)
key, conflict := z.KeyToHash(1)
expiration := time.Now().Add(time.Second)
i := Item[int]{
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestStoreExpiration(t *testing.T) {
}

func BenchmarkStoreGet(b *testing.B) {
s := newStore[int]()
s := newStore[int](nil)
key, conflict := z.KeyToHash(1)
i := Item[int]{
Key: key,
Expand All @@ -201,7 +201,7 @@ func BenchmarkStoreGet(b *testing.B) {
}

func BenchmarkStoreSet(b *testing.B) {
s := newStore[int]()
s := newStore[int](nil)
key, conflict := z.KeyToHash(1)
b.SetBytes(1)
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -217,7 +217,7 @@ func BenchmarkStoreSet(b *testing.B) {
}

func BenchmarkStoreUpdate(b *testing.B) {
s := newStore[int]()
s := newStore[int](nil)
key, conflict := z.KeyToHash(1)
i := Item[int]{
Key: key,
Expand Down
Loading