Skip to content

Commit

Permalink
Merge pull request #817 from fluxcd/refactor-cache
Browse files Browse the repository at this point in the history
Simplify the cache
  • Loading branch information
darkowlzz authored Dec 5, 2024
2 parents 9e2947d + a29e42f commit b83bd25
Show file tree
Hide file tree
Showing 19 changed files with 506 additions and 1,669 deletions.
160 changes: 48 additions & 112 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,17 @@ const (
defaultInterval = time.Minute
)

// Cache[T] is a thread-safe in-memory key/object store.
// It can be used to store objects with optional expiration.
// A function to extract the key from the object must be provided.
// Use the New function to create a new cache that is ready to use.
// Cache[T] is a thread-safe in-memory key/value store.
// It can be used to store items with optional expiration.
type Cache[T any] struct {
*cache[T]
// keyFunc is used to make the key for objects stored in and retrieved from index, and
// should be deterministic.
keyFunc KeyFunc[T]
}

// item is an item stored in the cache.
type item[T any] struct {
key string
// object is the item's object.
object T
// value is the item's value.
value T
// expiresAt is the item's expiration time.
expiresAt time.Time
}
Expand All @@ -61,41 +56,39 @@ type cache[T any] struct {
// It is initially true, and set to false when the items are not sorted.
sorted bool
// capacity is the maximum number of index the cache can hold.
capacity int
metrics *cacheMetrics
labelsFunc GetLvsFunc[T]
janitor *janitor[T]
closed bool
capacity int
metrics *cacheMetrics
janitor *janitor[T]
closed bool

mu sync.RWMutex
}

var _ Expirable[any] = &Cache[any]{}

// New creates a new cache with the given configuration.
func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T], error) {
func New[T any](capacity int, opts ...Options) (*Cache[T], error) {
opt, err := makeOptions(opts...)
if err != nil {
return nil, fmt.Errorf("failed to apply options: %w", err)
}

c := &cache[T]{
index: make(map[string]*item[T]),
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
labelsFunc: opt.labelsFunc,
index: make(map[string]*item[T]),
items: make([]*item[T], 0, capacity),
sorted: true,
capacity: capacity,
janitor: &janitor[T]{
interval: opt.interval,
stop: make(chan bool),
},
}

if opt.registerer != nil {
c.metrics = newCacheMetrics(opt.registerer, opt.extraLabels...)
c.metrics = newCacheMetrics(opt.metricsPrefix, opt.registerer)
}

C := &Cache[T]{cache: c, keyFunc: keyFunc}
C := &Cache[T]{cache: c}

if opt.interval > 0 {
go c.janitor.run(c)
Expand All @@ -104,8 +97,8 @@ func New[T any](capacity int, keyFunc KeyFunc[T], opts ...Options[T]) (*Cache[T]
return C, nil
}

func makeOptions[T any](opts ...Options[T]) (*storeOptions[T], error) {
opt := storeOptions[T]{}
func makeOptions(opts ...Options) (*storeOptions, error) {
opt := storeOptions{}
for _, o := range opts {
err := o(&opt)
if err != nil {
Expand All @@ -131,14 +124,8 @@ func (c *Cache[T]) Close() error {
}

// Set an item in the cache, existing index will be overwritten.
// If the cache is full, Add will return an error.
func (c *Cache[T]) Set(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}

// If the cache is full, an error is returned.
func (c *Cache[T]) Set(key string, value T) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand All @@ -147,14 +134,14 @@ func (c *Cache[T]) Set(object T) error {
}
_, found := c.index[key]
if found {
c.set(key, object)
c.set(key, value)
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
return nil
}

if c.capacity > 0 && len(c.index) < c.capacity {
c.set(key, object)
c.set(key, value)
c.mu.Unlock()
recordRequest(c.metrics, StatusSuccess)
recordItemIncrement(c.metrics)
Expand All @@ -165,10 +152,10 @@ func (c *Cache[T]) Set(object T) error {
return ErrCacheFull
}

func (c *cache[T]) set(key string, object T) {
func (c *cache[T]) set(key string, value T) {
item := item[T]{
key: key,
object: object,
value: value,
expiresAt: time.Now().Add(noExpiration),
}

Expand All @@ -181,86 +168,40 @@ func (c *cache[T]) set(key string, object T) {
c.items = append(c.items, &item)
}

// Get an item from the cache. Returns the item or nil, and a bool indicating
// whether the key was found.
func (c *Cache[T]) Get(object T) (item T, exists bool, err error) {
var res T
lvs := []string{}
if c.labelsFunc != nil {
lvs, err = c.labelsFunc(object, len(c.metrics.getExtraLabels()))
if err != nil {
recordRequest(c.metrics, StatusFailure)
return res, false, &CacheError{Reason: ErrInvalidLabels, Err: err}
}
}
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return res, false, &CacheError{Reason: ErrInvalidKey, Err: err}
}
item, found, err := c.get(key)
if err != nil {
return res, false, err
}
if !found {
recordEvent(c.metrics, CacheEventTypeMiss, lvs...)
return res, false, nil
}
recordEvent(c.metrics, CacheEventTypeHit, lvs...)
return item, true, nil
}

// GetByKey returns the object for the given key.
func (c *Cache[T]) GetByKey(key string) (T, bool, error) {
var res T
index, found, err := c.get(key)
if err != nil {
return res, false, err
}
if !found {
recordEvent(c.metrics, CacheEventTypeMiss)
return res, false, nil
}

recordEvent(c.metrics, CacheEventTypeHit)
return index, true, nil
}

func (c *cache[T]) get(key string) (T, bool, error) {
// Get returns an item in the cache for the given key. If no item is found, an
// error is returned.
// The caller can record cache hit or miss based on the result with
// Cache.RecordCacheEvent().
func (c *Cache[T]) Get(key string) (T, error) {
var res T
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
recordRequest(c.metrics, StatusFailure)
return res, false, ErrCacheClosed
return res, ErrCacheClosed
}
item, found := c.index[key]
if !found {
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return res, false, nil
return res, ErrNotFound
}
if !item.expiresAt.IsZero() {
if item.expiresAt.Compare(time.Now()) < 0 {
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return res, false, nil
return res, ErrNotFound
}
}
c.mu.RUnlock()
recordRequest(c.metrics, StatusSuccess)
return item.object, true, nil
return item.value, nil
}

// Delete an item from the cache. Does nothing if the key is not in the cache.
// It actually sets the item expiration to `now“, so that it will be deleted at
// the cleanup.
func (c *Cache[T]) Delete(object T) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}
func (c *Cache[T]) Delete(key string) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand Down Expand Up @@ -355,13 +296,7 @@ func (c *cache[T]) Resize(size int) (int, error) {
}

// HasExpired returns true if the item has expired.
func (c *Cache[T]) HasExpired(object T) (bool, error) {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return false, &CacheError{Reason: ErrInvalidKey, Err: err}
}

func (c *Cache[T]) HasExpired(key string) (bool, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
Expand All @@ -387,13 +322,7 @@ func (c *Cache[T]) HasExpired(object T) (bool, error) {
}

// SetExpiration sets the expiration for the given key.
func (c *Cache[T]) SetExpiration(object T, expiration time.Time) error {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return &CacheError{Reason: ErrInvalidKey, Err: err}
}

func (c *Cache[T]) SetExpiration(key string, expiration time.Time) error {
c.mu.Lock()
if c.closed {
c.mu.Unlock()
Expand All @@ -417,12 +346,7 @@ func (c *Cache[T]) SetExpiration(object T, expiration time.Time) error {
// GetExpiration returns the expiration for the given key.
// Returns zero if the key is not in the cache or the item
// has already expired.
func (c *Cache[T]) GetExpiration(object T) (time.Time, error) {
key, err := c.keyFunc(object)
if err != nil {
recordRequest(c.metrics, StatusFailure)
return time.Time{}, &CacheError{Reason: ErrInvalidKey, Err: err}
}
func (c *Cache[T]) GetExpiration(key string) (time.Time, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
Expand Down Expand Up @@ -481,6 +405,18 @@ func (c *cache[T]) deleteExpired() {
c.mu.Unlock()
}

// RecordCacheEvent records a cache event (cache_miss or cache_hit) with kind,
// name and namespace of the associated object being reconciled.
func (c *Cache[T]) RecordCacheEvent(event, kind, name, namespace string) {
recordCacheEvent(c.metrics, event, kind, name, namespace)
}

// DeleteCacheEvent deletes the cache event (cache_miss or cache_hit) metric for
// the associated object being reconciled, given their kind, name and namespace.
func (c *Cache[T]) DeleteCacheEvent(event, kind, name, namespace string) {
deleteCacheEvent(c.metrics, event, kind, name, namespace)
}

type janitor[T any] struct {
interval time.Duration
stop chan bool
Expand Down
Loading

0 comments on commit b83bd25

Please sign in to comment.