Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shwap): Add caching to blockstore #3615

Merged
merged 4 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion store/cache/accessor_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (bc *AccessorCache) Remove(height uint64) error {

// EnableMetrics enables metrics for the cache.
func (bc *AccessorCache) EnableMetrics() (unreg func() error, err error) {
bc.metrics, err = newMetrics(bc)
if bc.metrics == nil {
bc.metrics, err = newMetrics(bc)
}
return bc.metrics.reg.Unregister, err
}

Expand Down
17 changes: 17 additions & 0 deletions store/cache/doublecache.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache

import (
"context"
"errors"
"fmt"

Expand Down Expand Up @@ -29,6 +30,22 @@ func (mc *DoubleCache) Get(height uint64) (eds.AccessorStreamer, error) {
return mc.second.Get(height)
}

// GetOrLoad attempts to get an item from the both caches and, if not found, invokes
// the provided loader function to load it into the first Cache.
func (mc *DoubleCache) GetOrLoad(
ctx context.Context,
height uint64,
loader OpenAccessorFn,
) (eds.AccessorStreamer, error) {
// look-up in second cache first
accessor, err := mc.second.Get(height)
cristaloleg marked this conversation as resolved.
Show resolved Hide resolved
if err == nil {
return accessor, nil
}
// not found in second, get or load from first one
return mc.first.GetOrLoad(ctx, height, loader)
}

// Remove removes an item from all underlying caches
func (mc *DoubleCache) Remove(height uint64) error {
err1 := mc.first.Remove(height)
Expand Down
21 changes: 15 additions & 6 deletions store/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/celestiaorg/celestia-node/store/cache"
)

const (
Expand Down Expand Up @@ -57,19 +59,26 @@ func (s *Store) WithMetrics() error {
return err
}

unreg, err := s.cache.EnableMetrics()
if err != nil {
return fmt.Errorf("while enabling metrics for cache: %w", err)
}

s.metrics = &metrics{
put: put,
putExists: putExists,
get: get,
has: has,
remove: remove,
unreg: unreg,
}
return s.metrics.addCacheMetrics(s.cache)
}

// addCacheMetrics adds cache metrics to store metrics
func (m *metrics) addCacheMetrics(c cache.Cache) error {
if m == nil {
return nil
}
unreg, err := c.EnableMetrics()
if err != nil {
return fmt.Errorf("while enabling metrics for cache: %w", err)
}
m.unreg = unreg
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Store struct {
// basepath is the root directory of the store
basepath string
// cache is used to cache recent blocks and blocks that are accessed frequently
cache *cache.DoubleCache
cache cache.Cache
// stripedLocks is used to synchronize parallel operations
stripLock *striplock
metrics *metrics
Expand Down Expand Up @@ -74,19 +74,19 @@ func NewStore(params *Parameters, basePath string) (*Store, error) {
return nil, fmt.Errorf("creating empty file: %w", err)
}

recentEDSCache, err := cache.NewAccessorCache("recent", params.RecentBlocksCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create recent eds cache: %w", err)
}
var recentCache cache.Cache
recentCache = cache.NoopCache{}
if params.RecentBlocksCacheSize > 0 {
recentCache, err = cache.NewAccessorCache("recent", params.RecentBlocksCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create recent eds cache: %w", err)
}

availabilityCache, err := cache.NewAccessorCache("availability", params.AvailabilityCacheSize)
if err != nil {
return nil, fmt.Errorf("failed to create availability cache: %w", err)
}

store := &Store{
basepath: basePath,
cache: cache.NewDoubleCache(recentEDSCache, availabilityCache),
cache: recentCache,
stripLock: newStripLock(1024),
}
return store, nil
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *Store) Put(

// put to cache before writing to make it accessible while write is happening
accessor := &eds.Rsmt2D{ExtendedDataSquare: square}
acc, err := s.cache.First().GetOrLoad(ctx, height, accessorLoader(accessor))
acc, err := s.cache.GetOrLoad(ctx, height, accessorLoader(accessor))
if err != nil {
log.Warnf("failed to put Accessor in the recent cache: %s", err)
} else {
Expand Down
54 changes: 54 additions & 0 deletions store/store_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package store

import (
"context"
"fmt"

eds "github.com/celestiaorg/celestia-node/share/new_eds"
"github.com/celestiaorg/celestia-node/store/cache"
)

// CachedStore is a store with an additional cache layer. New cache layer is created on top of the
// original store cache. Parent store cache will be able to read from the new cache layer, but will
// not be able to write to it. Making parent store cache and CachedStore cache independent for writes.
type CachedStore struct {
store *Store
combinedCache *cache.DoubleCache
}

// WithCache wraps store with extra layer of cache. Created caching layer will have read access to original
// store cache and will duplicate it's content. It updates parent store cache, to allow it to
// read from additionally created cache layer.
func (s *Store) WithCache(name string, size int) (*CachedStore, error) {
newCache, err := cache.NewAccessorCache(name, size)
if err != nil {
return nil, fmt.Errorf("failed to create %s cache: %w", name, err)
}

wrappedCache := cache.NewDoubleCache(s.cache, newCache)
s.metrics.addCacheMetrics(wrappedCache)
// update parent store cache to allow it to read from both caches
s.cache = wrappedCache
return &CachedStore{
store: s,
combinedCache: wrappedCache,
}, nil
}

// GetByHeight returns accessor for given height and puts it into cache.
func (cs *CachedStore) GetByHeight(ctx context.Context, height uint64) (eds.AccessorStreamer, error) {
acc, err := cs.combinedCache.First().Get(height)
if err == nil {
return acc, err
}
return cs.combinedCache.Second().GetOrLoad(ctx, height, cs.openFile(height))
}

func (cs *CachedStore) openFile(height uint64) cache.OpenAccessorFn {
return func(ctx context.Context) (eds.AccessorStreamer, error) {
// open file directly wihout calling GetByHeight of inner getter to
// avoid hitting store cache second time
path := cs.store.heightToPath(height)
return cs.store.openFile(path)
}
}
76 changes: 76 additions & 0 deletions store/store_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package store

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-node/store/cache"
)

func TestStore_WithCache(t *testing.T) {
height := atomic.Uint64{}
height.Store(1)

t.Run("don't exist in first cache", func(t *testing.T) {
// create store with no cache
params := paramsNoCache()
store, err := NewStore(params, t.TempDir())
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
eds, roots := randomEDS(t)
height := height.Add(1)
err = store.Put(ctx, roots, height, eds)
require.NoError(t, err)

// check that the height is not in the cache (cache was disabled)
_, err = store.cache.Get(height)
require.ErrorIs(t, err, cache.ErrCacheMiss)

cachedStore, err := store.WithCache("test", 10)
require.NoError(t, err)
// load accessor to secondary cache by calling GetByHeight on cached store
_, err = cachedStore.GetByHeight(ctx, height)
require.NoError(t, err)

// loaded accessor should be available in both original store and wrapped store
_, err = store.cache.Get(height)
require.NoError(t, err)
_, err = cachedStore.combinedCache.Get(height)
require.NoError(t, err)
})

t.Run("exists in first cache", func(t *testing.T) {
store, err := NewStore(DefaultParameters(), t.TempDir())
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)
eds, roots := randomEDS(t)
height := height.Add(1)
err = store.Put(ctx, roots, height, eds)
require.NoError(t, err)

_, err = store.cache.Get(height)
require.NoError(t, err)

withCache, err := store.WithCache("test", 10)
require.NoError(t, err)
_, err = withCache.GetByHeight(ctx, height)
require.NoError(t, err)

_, err = withCache.combinedCache.Second().Get(height)
require.ErrorIs(t, err, cache.ErrCacheMiss)
})
}

func paramsNoCache() *Parameters {
params := DefaultParameters()
params.RecentBlocksCacheSize = 0
return params
}
13 changes: 2 additions & 11 deletions store/store_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,18 @@ import (
type Parameters struct {
// RecentBlocksCacheSize is the size of the cache for recent blocks.
RecentBlocksCacheSize int

// AvailabilityCacheSize is the size of the cache for accessors requested for serving availability
// samples.
AvailabilityCacheSize int
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}

// DefaultParameters returns the default configuration values for the EDS store parameters.
func DefaultParameters() *Parameters {
return &Parameters{
RecentBlocksCacheSize: 10,
AvailabilityCacheSize: 128,
}
}

func (p *Parameters) Validate() error {
if p.RecentBlocksCacheSize < 1 {
return errors.New("recent eds cache size must be positive")
}

if p.AvailabilityCacheSize < 1 {
return errors.New("availability cache size must be positive")
if p.RecentBlocksCacheSize < 0 {
return errors.New("recent eds cache size cannot be negative")
}
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
return nil
}
28 changes: 10 additions & 18 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestEDSStore(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
t.Cleanup(cancel)

edsStore, err := NewStore(DefaultParameters(), t.TempDir())
edsStore, err := NewStore(paramsNoCache(), t.TempDir())
require.NoError(t, err)

// disable cache
Expand Down Expand Up @@ -138,9 +138,12 @@ func TestEDSStore(t *testing.T) {
})

t.Run("Remove", func(t *testing.T) {
edsStore, err := NewStore(DefaultParameters(), t.TempDir())
require.NoError(t, err)

// removing file that does not exist should be noop
missingHeight := height.Add(1)
err := edsStore.Remove(ctx, missingHeight, share.DataHash{0x01, 0x02})
err = edsStore.Remove(ctx, missingHeight, share.DataHash{0x01, 0x02})
require.NoError(t, err)

eds, roots := randomEDS(t)
Expand Down Expand Up @@ -248,14 +251,15 @@ func BenchmarkStore(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

edsStore, err := NewStore(DefaultParameters(), b.TempDir())
require.NoError(b, err)

eds := edstest.RandEDS(b, 128)
roots, err := share.NewAxisRoots(eds)
require.NoError(b, err)

// BenchmarkStore/bench_put_128-10 27 19209780 ns/op (~19ms)
b.Run("put 128", func(b *testing.B) {
edsStore, err := NewStore(paramsNoCache(), b.TempDir())
require.NoError(b, err)

b.ResetTimer()
for i := 0; i < b.N; i++ {
roots := edstest.RandomAxisRoots(b, 1)
Expand All @@ -266,13 +270,7 @@ func BenchmarkStore(b *testing.B) {
// read 128 EDSs does not read full EDS, but only the header
// BenchmarkStore/bench_read_128-10 82766 14678 ns/op (~14mcs)
b.Run("open by height, 128", func(b *testing.B) {
edsStore, err := NewStore(DefaultParameters(), b.TempDir())
require.NoError(b, err)

// disable cache
edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})

roots, err := share.NewAxisRoots(eds)
edsStore, err := NewStore(paramsNoCache(), b.TempDir())
require.NoError(b, err)

height := uint64(1984)
Expand All @@ -292,12 +290,6 @@ func BenchmarkStore(b *testing.B) {
edsStore, err := NewStore(DefaultParameters(), b.TempDir())
require.NoError(b, err)

// disable cache
edsStore.cache = cache.NewDoubleCache(cache.NoopCache{}, cache.NoopCache{})

roots, err := share.NewAxisRoots(eds)
require.NoError(b, err)

height := uint64(1984)
err = edsStore.Put(ctx, roots, height, eds)
require.NoError(b, err)
Expand Down
Loading