Skip to content

Commit

Permalink
feat(shwap): Add caching to blockstore (#3615)
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored Aug 7, 2024
1 parent 5bb844a commit 5f6fdab
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 46 deletions.
4 changes: 3 additions & 1 deletion store/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (bc *AccessorCache) Remove(height uint64) error {

// EnableMetrics enables metrics for the cache.
func (bc *AccessorCache) EnableMetrics() (unreg func() error, err error) {
bc.metrics, err = newMetrics(bc)
if bc.metrics == nil {
bc.metrics, err = newMetrics(bc)
}
return bc.metrics.reg.Unregister, err
}

Expand Down
17 changes: 17 additions & 0 deletions store/cache/doublecache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"context"
"errors"
"fmt"

Expand Down Expand Up @@ -29,6 +30,22 @@ func (mc *DoubleCache) Get(height uint64) (eds.AccessorStreamer, error) {
return mc.second.Get(height)
}

// GetOrLoad attempts to get an item from the both caches and, if not found, invokes
// the provided loader function to load it into the first Cache.
func (mc *DoubleCache) GetOrLoad(
ctx context.Context,
height uint64,
loader OpenAccessorFn,
) (eds.AccessorStreamer, error) {
// look-up in second cache first
accessor, err := mc.second.Get(height)
if err == nil {
return accessor, nil
}
// not found in second, get or load from first one
return mc.first.GetOrLoad(ctx, height, loader)
}

// Remove removes an item from all underlying caches
func (mc *DoubleCache) Remove(height uint64) error {
err1 := mc.first.Remove(height)
Expand Down
21 changes: 15 additions & 6 deletions store/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/celestiaorg/celestia-node/store/cache"
)

const (
Expand Down Expand Up @@ -57,19 +59,26 @@ func (s *Store) WithMetrics() error {
return err
}

unreg, err := s.cache.EnableMetrics()
if err != nil {
return fmt.Errorf("while enabling metrics for cache: %w", err)
}

s.metrics = &metrics{
put: put,
putExists: putExists,
get: get,
has: has,
remove: remove,
unreg: unreg,
}
return s.metrics.addCacheMetrics(s.cache)
}

// addCacheMetrics adds cache metrics to store metrics
func (m *metrics) addCacheMetrics(c cache.Cache) error {
if m == nil {
return nil
}
unreg, err := c.EnableMetrics()
if err != nil {
return fmt.Errorf("while enabling metrics for cache: %w", err)
}
m.unreg = unreg
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Store struct {
// basepath is the root directory of the store
basepath string
// cache is used to cache recent blocks and blocks that are accessed frequently
cache *cache.DoubleCache
cache cache.Cache
// stripedLocks is used to synchronize parallel operations
stripLock *striplock
metrics *metrics
Expand Down Expand Up @@ -74,19 +74,19 @@ func NewStore(params *Parameters, basePath string) (*Store, error) {
return nil, fmt.Errorf("creating empty file: %w", err)
}

recentEDSCache, err := cache.NewAccessorCache("recent", params.RecentBlocksCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create recent eds cache: %w", err)
}
var recentCache cache.Cache
recentCache = cache.NoopCache{}
if params.RecentBlocksCacheSize > 0 {
recentCache, err = cache.NewAccessorCache("recent", params.RecentBlocksCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create recent eds cache: %w", err)
}

availabilityCache, err := cache.NewAccessorCache("availability", params.AvailabilityCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create availability cache: %w", err)
}

store := &Store{
basepath: basePath,
cache: cache.NewDoubleCache(recentEDSCache, availabilityCache),
cache: recentCache,
stripLock: newStripLock(1024),
}
return store, nil
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *Store) Put(

// put to cache before writing to make it accessible while write is happening
accessor := &eds.Rsmt2D{ExtendedDataSquare: square}
acc, err := s.cache.First().GetOrLoad(ctx, height, accessorLoader(accessor))
acc, err := s.cache.GetOrLoad(ctx, height, accessorLoader(accessor))
if err != nil {
log.Warnf("failed to put Accessor in the recent cache: %s", err)
} else {
Expand Down
54 changes: 54 additions & 0 deletions store/store_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package store

import (
"context"
"fmt"

eds "github.com/celestiaorg/celestia-node/share/new_eds"
"github.com/celestiaorg/celestia-node/store/cache"
)

// CachedStore is a store with an additional cache layer. New cache layer is created on top of the
// original store cache. Parent store cache will be able to read from the new cache layer, but will
// not be able to write to it. Making parent store cache and CachedStore cache independent for writes.
type CachedStore struct {
store *Store
combinedCache *cache.DoubleCache
}

// WithCache wraps store with extra layer of cache. Created caching layer will have read access to original
// store cache and will duplicate it's content. It updates parent store cache, to allow it to
// read from additionally created cache layer.
func (s *Store) WithCache(name string, size int) (*CachedStore, error) {
newCache, err := cache.NewAccessorCache(name, size)
if err != nil {
return nil, fmt.Errorf("failed to create %s cache: %w", name, err)
}

wrappedCache := cache.NewDoubleCache(s.cache, newCache)
s.metrics.addCacheMetrics(wrappedCache)
// update parent store cache to allow it to read from both caches
s.cache = wrappedCache
return &CachedStore{
store: s,
combinedCache: wrappedCache,
}, nil
}

// GetByHeight returns accessor for given height and puts it into cache.
func (cs *CachedStore) GetByHeight(ctx context.Context, height uint64) (eds.AccessorStreamer, error) {
acc, err := cs.combinedCache.First().Get(height)
if err == nil {
return acc, err
}
return cs.combinedCache.Second().GetOrLoad(ctx, height, cs.openFile(height))
}

func (cs *CachedStore) openFile(height uint64) cache.OpenAccessorFn {
return func(ctx context.Context) (eds.AccessorStreamer, error) {
// open file directly wihout calling GetByHeight of inner getter to
// avoid hitting store cache second time
path := cs.store.heightToPath(height)
return cs.store.openFile(path)
}
}
76 changes: 76 additions & 0 deletions store/store_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package store

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/store/cache"
)

func TestStore_WithCache(t *testing.T) {
height := atomic.Uint64{}
height.Store(1)

t.Run("don't exist in first cache", func(t *testing.T) {
// create store with no cache
params := paramsNoCache()
store, err := NewStore(params, t.TempDir())
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
eds, roots := randomEDS(t)
height := height.Add(1)
err = store.Put(ctx, roots, height, eds)
require.NoError(t, err)

// check that the height is not in the cache (cache was disabled)
_, err = store.cache.Get(height)
require.ErrorIs(t, err, cache.ErrCacheMiss)

cachedStore, err := store.WithCache("test", 10)
require.NoError(t, err)
// load accessor to secondary cache by calling GetByHeight on cached store
_, err = cachedStore.GetByHeight(ctx, height)
require.NoError(t, err)

// loaded accessor should be available in both original store and wrapped store
_, err = store.cache.Get(height)
require.NoError(t, err)
_, err = cachedStore.combinedCache.Get(height)
require.NoError(t, err)
})

t.Run("exists in first cache", func(t *testing.T) {
store, err := NewStore(DefaultParameters(), t.TempDir())
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
eds, roots := randomEDS(t)
height := height.Add(1)
err = store.Put(ctx, roots, height, eds)
require.NoError(t, err)

_, err = store.cache.Get(height)
require.NoError(t, err)

withCache, err := store.WithCache("test", 10)
require.NoError(t, err)
_, err = withCache.GetByHeight(ctx, height)
require.NoError(t, err)

_, err = withCache.combinedCache.Second().Get(height)
require.ErrorIs(t, err, cache.ErrCacheMiss)
})
}

func paramsNoCache() *Parameters {
params := DefaultParameters()
params.RecentBlocksCacheSize = 0
return params
}
13 changes: 2 additions & 11 deletions store/store_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,18 @@ import (
type Parameters struct {
// RecentBlocksCacheSize is the size of the cache for recent blocks.
RecentBlocksCacheSize int

// AvailabilityCacheSize is the size of the cache for accessors requested for serving availability
// samples.
AvailabilityCacheSize int
}

// DefaultParameters returns the default configuration values for the EDS store parameters.
func DefaultParameters() *Parameters {
return &Parameters{
RecentBlocksCacheSize: 10,
AvailabilityCacheSize: 128,
}
}

func (p *Parameters) Validate() error {
if p.RecentBlocksCacheSize < 1 {
return errors.New("recent eds cache size must be positive")
}

if p.AvailabilityCacheSize < 1 {
return errors.New("availability cache size must be positive")
if p.RecentBlocksCacheSize < 0 {
return errors.New("recent eds cache size cannot be negative")
}
return nil
}
28 changes: 10 additions & 18 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestEDSStore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

edsStore, err := NewStore(DefaultParameters(), t.TempDir())
edsStore, err := NewStore(paramsNoCache(), t.TempDir())
require.NoError(t, err)

// disable cache
Expand Down Expand Up @@ -138,9 +138,12 @@ func TestEDSStore(t *testing.T) {
})

t.Run("Remove", func(t *testing.T) {
edsStore, err := NewStore(DefaultParameters(), t.TempDir())
require.NoError(t, err)

// removing file that does not exist should be noop
missingHeight := height.Add(1)
err := edsStore.Remove(ctx, missingHeight, share.DataHash{0x01, 0x02})
err = edsStore.Remove(ctx, missingHeight, share.DataHash{0x01, 0x02})
require.NoError(t, err)

eds, roots := randomEDS(t)
Expand Down Expand Up @@ -248,14 +251,15 @@ func BenchmarkStore(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

edsStore, err := NewStore(DefaultParameters(), b.TempDir())
require.NoError(b, err)

eds := edstest.RandEDS(b, 128)
roots, err := share.NewAxisRoots(eds)
require.NoError(b, err)

// BenchmarkStore/bench_put_128-10 27 19209780 ns/op (~19ms)
b.Run("put 128", func(b *testing.B) {
edsStore, err := NewStore(paramsNoCache(), b.TempDir())
require.NoError(b, err)

b.ResetTimer()
for i := 0; i < b.N; i++ {
roots := edstest.RandomAxisRoots(b, 1)
Expand All @@ -266,13 +270,7 @@ func BenchmarkStore(b *testing.B) {
// read 128 EDSs does not read full EDS, but only the header
// BenchmarkStore/bench_read_128-10 82766 14678 ns/op (~14mcs)
b.Run("open by height, 128", func(b *testing.B) {
edsStore, err := NewStore(DefaultParameters(), b.TempDir())
require.NoError(b, err)

// disable cache
edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})

roots, err := share.NewAxisRoots(eds)
edsStore, err := NewStore(paramsNoCache(), b.TempDir())
require.NoError(b, err)

height := uint64(1984)
Expand All @@ -292,12 +290,6 @@ func BenchmarkStore(b *testing.B) {
edsStore, err := NewStore(DefaultParameters(), b.TempDir())
require.NoError(b, err)

// disable cache
edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})

roots, err := share.NewAxisRoots(eds)
require.NoError(b, err)

height := uint64(1984)
err = edsStore.Put(ctx, roots, height, eds)
require.NoError(b, err)
Expand Down

0 comments on commit 5f6fdab

Please sign in to comment.