Skip to content

Commit

Permalink
added new cache and cacheutils packages
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel Ángel Ortuño <ortuman@gmail.com>
  • Loading branch information
ortuman committed Dec 2, 2022
1 parent 45ea666 commit 4c73f60
Show file tree
Hide file tree
Showing 23 changed files with 2,490 additions and 267 deletions.
74 changes: 74 additions & 0 deletions cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/thanos-io/thanos/blob/main/pkg/store/cache/cache.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Thanos Authors.

package cache

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/dskit/cacheutil"
)

// Cache is a generic interface.
type Cache interface {
// Store data into the cache.
//
// Note that individual byte buffers may be retained by the cache!
Store(ctx context.Context, data map[string][]byte, ttl time.Duration)

// Fetch multiple keys from cache. Returns map of input keys to data.
// If key isn't in the map, data for given key was not found.
Fetch(ctx context.Context, keys []string) map[string][]byte

Name() string
}

const (
BackendMemcached = "memcached"
)

type BackendConfig struct {
Backend string `yaml:"backend"`
Memcached MemcachedConfig `yaml:"memcached"`
}

// Validate the config.
func (cfg *BackendConfig) Validate() error {
if cfg.Backend != "" && cfg.Backend != BackendMemcached {
return fmt.Errorf("unsupported cache backend: %s", cfg.Backend)
}

if cfg.Backend == BackendMemcached {
if err := cfg.Memcached.Validate(); err != nil {
return err
}
}

return nil
}

func CreateClient(cacheName string, cfg BackendConfig, logger log.Logger, reg prometheus.Registerer) (Cache, error) {
switch cfg.Backend {
case "":
// No caching.
return nil, nil

case BackendMemcached:
client, err := cacheutil.NewMemcachedClientWithConfig(logger, cacheName, cfg.Memcached.ToMemcachedClientConfig(), reg)
if err != nil {
return nil, errors.Wrapf(err, "failed to create memcached client")
}
return NewMemcachedCache(cacheName, logger, client, reg), nil

default:
return nil, errors.Errorf("unsupported cache type for cache %s: %s", cacheName, cfg.Backend)
}
}
103 changes: 103 additions & 0 deletions cache/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/chunk/cache/snappy.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.

package cache

import (
"context"
"errors"
"flag"
"fmt"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/golang/snappy"

"github.com/grafana/dskit/util"
)

const (
// CompressionSnappy is the value of the snappy compression.
CompressionSnappy = "snappy"
)

var (
supportedCompressions = []string{CompressionSnappy}
errUnsupportedCompression = errors.New("unsupported compression")
)

type CompressionConfig struct {
Compression string `yaml:"compression"`
}

// RegisterFlagsWithPrefix registers flags with provided prefix.
func (cfg *CompressionConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.Compression, prefix+"compression", "", fmt.Sprintf("Enable cache compression, if not empty. Supported values are: %s.", strings.Join(supportedCompressions, ", ")))
}

func (cfg *CompressionConfig) Validate() error {
if cfg.Compression != "" && !util.StringsContain(supportedCompressions, cfg.Compression) {
return errUnsupportedCompression
}

return nil
}

func NewCompression(cfg CompressionConfig, next Cache, logger log.Logger) Cache {
switch cfg.Compression {
case CompressionSnappy:
return NewSnappy(next, logger)
default:
// No compression.
return next
}
}

type snappyCache struct {
next Cache
logger log.Logger
}

// NewSnappy makes a new snappy encoding cache wrapper.
func NewSnappy(next Cache, logger log.Logger) Cache {
return &snappyCache{
next: next,
logger: logger,
}
}

// Store implements Cache.
func (s *snappyCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
encoded := make(map[string][]byte, len(data))
for key, value := range data {
encoded[key] = snappy.Encode(nil, value)
}

s.next.Store(ctx, encoded, ttl)
}

// Fetch implements Cache.
func (s *snappyCache) Fetch(ctx context.Context, keys []string) map[string][]byte {
found := s.next.Fetch(ctx, keys)
decoded := make(map[string][]byte, len(found))

for key, encodedValue := range found {
decodedValue, err := snappy.Decode(nil, encodedValue)
if err != nil {
level.Error(s.logger).Log("msg", "failed to decode cache entry", "err", err)
continue
}

decoded[key] = decodedValue
}

return decoded
}

func (s *snappyCache) Name() string {
return s.next.Name()
}
70 changes: 70 additions & 0 deletions cache/compression_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// SPDX-License-Identifier: AGPL-3.0-only

package cache

import (
"context"
"testing"
"time"

"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
)

func TestCompressionConfig_Validate(t *testing.T) {
tests := map[string]struct {
cfg CompressionConfig
expected error
}{
"should pass with default config": {
cfg: CompressionConfig{},
},
"should pass with snappy compression": {
cfg: CompressionConfig{
Compression: "snappy",
},
},
"should fail with unsupported compression": {
cfg: CompressionConfig{
Compression: "unsupported",
},
expected: errUnsupportedCompression,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
assert.Equal(t, testData.expected, testData.cfg.Validate())
})
}
}

func TestSnappyCache(t *testing.T) {
ctx := context.Background()
backend := NewMockCache()
c := NewSnappy(backend, log.NewNopLogger())

t.Run("Fetch() should return empty results if no key has been found", func(t *testing.T) {
assert.Empty(t, c.Fetch(ctx, []string{"a", "b", "c"}))
})

t.Run("Fetch() should return previously set keys", func(t *testing.T) {
expected := map[string][]byte{
"a": []byte("value-a"),
"b": []byte("value-b"),
}

c.Store(ctx, expected, time.Hour)
assert.Equal(t, expected, c.Fetch(ctx, []string{"a", "b", "c"}))
})

t.Run("Fetch() should skip entries failing to decode", func(t *testing.T) {
c.Store(ctx, map[string][]byte{"a": []byte("value-a")}, time.Hour)
backend.Store(ctx, map[string][]byte{"b": []byte("value-b")}, time.Hour)

expected := map[string][]byte{
"a": []byte("value-a"),
}
assert.Equal(t, expected, c.Fetch(ctx, []string{"a", "b", "c"}))
})
}
136 changes: 136 additions & 0 deletions cache/lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// SPDX-License-Identifier: AGPL-3.0-only

package cache

import (
"context"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/simplelru"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type LRUCache struct {
c Cache
defaultTTL time.Duration
name string

mtx sync.Mutex
lru *lru.LRU

requests prometheus.Counter
hits prometheus.Counter
items prometheus.GaugeFunc
}

type Item struct {
Data []byte
ExpiresAt time.Time
}

// WrapWithLRUCache wraps a given `Cache` c with a LRU cache. The LRU cache will always store items in both caches.
// However it will only fetch items from the underlying cache if the LRU cache doesn't have the item.
// Items fetched from the underlying cache will be stored in the LRU cache with a default TTL.
// The LRU cache will also remove items from the underlying cache if they are expired.
// The LRU cache is limited in number of items using `lruSize`. This means this cache is not tailored for large items or items that have a big
// variation in size.
func WrapWithLRUCache(c Cache, name string, reg prometheus.Registerer, lruSize int, defaultTTL time.Duration) (*LRUCache, error) {
lru, err := lru.NewLRU(lruSize, nil)
if err != nil {
return nil, err
}

cache := &LRUCache{
c: c,
lru: lru,
name: name,
defaultTTL: defaultTTL,

requests: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_cache_memory_requests_total",
Help: "Total number of requests to the in-memory cache.",
ConstLabels: map[string]string{"name": name},
}),
hits: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_cache_memory_hits_total",
Help: "Total number of requests to the in-memory cache that were a hit.",
ConstLabels: map[string]string{"name": name},
}),
}

cache.items = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_cache_memory_items_count",
Help: "Total number of items currently in the in-memory cache.",
ConstLabels: map[string]string{"name": name},
}, func() float64 {
cache.mtx.Lock()
defer cache.mtx.Unlock()

return float64(cache.lru.Len())
})

return cache, nil
}

func (l *LRUCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
// store the data in the shared cache.
l.c.Store(ctx, data, ttl)

l.mtx.Lock()
defer l.mtx.Unlock()

for k, v := range data {
l.lru.Add(k, &Item{
Data: v,
ExpiresAt: time.Now().Add(ttl),
})
}
}

func (l *LRUCache) Fetch(ctx context.Context, keys []string) (result map[string][]byte) {
l.requests.Add(float64(len(keys)))
l.mtx.Lock()
defer l.mtx.Unlock()
var (
found = make(map[string][]byte, len(keys))
miss = make([]string, 0, len(keys))
now = time.Now()
)

for _, k := range keys {
val, ok := l.lru.Get(k)
if !ok {
miss = append(miss, k)
continue
}
item := val.(*Item)
if item.ExpiresAt.After(now) {
found[k] = item.Data
continue
}
l.lru.Remove(k)
miss = append(miss, k)

}
l.hits.Add(float64(len(found)))

if len(miss) > 0 {
result = l.c.Fetch(ctx, miss)
for k, v := range result {
// we don't know the ttl of the result, so we use the default one.
l.lru.Add(k, &Item{
Data: v,
ExpiresAt: now.Add(l.defaultTTL),
})
found[k] = v
}
}

return found
}

func (l *LRUCache) Name() string {
return "in-memory-" + l.name
}
Loading

0 comments on commit 4c73f60

Please sign in to comment.