Skip to content

Commit

Permalink
Merge pull request #4565 from nozim/3932-refactor-badger-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
durkmurder authored Jul 27, 2023
2 parents 0607ed2 + 9d68658 commit 3f3da35
Show file tree
Hide file tree
Showing 22 changed files with 219 additions and 281 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ require (
require (
github.com/coreos/go-semver v0.3.0
github.com/go-playground/validator/v10 v10.14.1
github.com/hashicorp/golang-lru/v2 v2.0.2
github.com/mitchellh/mapstructure v1.5.0
github.com/onflow/wal v0.0.0-20230529184820-bc9f8244608d
github.com/slok/go-http-metrics v0.10.0
Expand Down Expand Up @@ -167,7 +168,6 @@ require (
github.com/googleapis/gax-go/v2 v2.7.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/huin/goupnp v1.2.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
Expand Down
22 changes: 10 additions & 12 deletions storage/badger/approvals.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@ import (
// ResultApprovals implements persistent storage for result approvals.
type ResultApprovals struct {
db *badger.DB
cache *Cache
cache *Cache[flow.Identifier, *flow.ResultApproval]
}

func NewResultApprovals(collector module.CacheMetrics, db *badger.DB) *ResultApprovals {

store := func(key interface{}, val interface{}) func(*transaction.Tx) error {
approval := val.(*flow.ResultApproval)
return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(approval)))
store := func(key flow.Identifier, val *flow.ResultApproval) func(*transaction.Tx) error {
return transaction.WithTx(operation.SkipDuplicates(operation.InsertResultApproval(val)))
}

retrieve := func(key interface{}) func(tx *badger.Txn) (interface{}, error) {
approvalID := key.(flow.Identifier)
retrieve := func(approvalID flow.Identifier) func(tx *badger.Txn) (*flow.ResultApproval, error) {
var approval flow.ResultApproval
return func(tx *badger.Txn) (interface{}, error) {
return func(tx *badger.Txn) (*flow.ResultApproval, error) {
err := operation.RetrieveResultApproval(approvalID, &approval)(tx)
return &approval, err
}
}

res := &ResultApprovals{
db: db,
cache: newCache(collector, metrics.ResourceResultApprovals,
withLimit(flow.DefaultTransactionExpiry+100),
withStore(store),
withRetrieve(retrieve)),
cache: newCache[flow.Identifier, *flow.ResultApproval](collector, metrics.ResourceResultApprovals,
withLimit[flow.Identifier, *flow.ResultApproval](flow.DefaultTransactionExpiry+100),
withStore[flow.Identifier, *flow.ResultApproval](store),
withRetrieve[flow.Identifier, *flow.ResultApproval](retrieve)),
}

return res
Expand All @@ -57,7 +55,7 @@ func (r *ResultApprovals) byID(approvalID flow.Identifier) func(*badger.Txn) (*f
if err != nil {
return nil, err
}
return val.(*flow.ResultApproval), nil
return val, nil
}
}

Expand Down
65 changes: 33 additions & 32 deletions storage/badger/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,92 +5,92 @@ import (
"fmt"

"github.com/dgraph-io/badger/v2"
lru "github.com/hashicorp/golang-lru"
lru "github.com/hashicorp/golang-lru/v2"

"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/storage/badger/transaction"
)

func withLimit(limit uint) func(*Cache) {
return func(c *Cache) {
func withLimit[K comparable, V any](limit uint) func(*Cache[K, V]) {
return func(c *Cache[K, V]) {
c.limit = limit
}
}

type storeFunc func(key interface{}, val interface{}) func(*transaction.Tx) error
type storeFunc[K comparable, V any] func(key K, val V) func(*transaction.Tx) error

const DefaultCacheSize = uint(1000)

func withStore(store storeFunc) func(*Cache) {
return func(c *Cache) {
func withStore[K comparable, V any](store storeFunc[K, V]) func(*Cache[K, V]) {
return func(c *Cache[K, V]) {
c.store = store
}
}

func noStore(key interface{}, val interface{}) func(*transaction.Tx) error {
func noStore[K comparable, V any](_ K, _ V) func(*transaction.Tx) error {
return func(tx *transaction.Tx) error {
return fmt.Errorf("no store function for cache put available")
}
}

func noopStore(key interface{}, val interface{}) func(*transaction.Tx) error {
func noopStore[K comparable, V any](_ K, _ V) func(*transaction.Tx) error {
return func(tx *transaction.Tx) error {
return nil
}
}

type retrieveFunc func(key interface{}) func(*badger.Txn) (interface{}, error)
type retrieveFunc[K comparable, V any] func(key K) func(*badger.Txn) (V, error)

func withRetrieve(retrieve retrieveFunc) func(*Cache) {
return func(c *Cache) {
func withRetrieve[K comparable, V any](retrieve retrieveFunc[K, V]) func(*Cache[K, V]) {
return func(c *Cache[K, V]) {
c.retrieve = retrieve
}
}

func noRetrieve(key interface{}) func(*badger.Txn) (interface{}, error) {
return func(tx *badger.Txn) (interface{}, error) {
return nil, fmt.Errorf("no retrieve function for cache get available")
func noRetrieve[K comparable, V any](_ K) func(*badger.Txn) (V, error) {
return func(tx *badger.Txn) (V, error) {
var nullV V
return nullV, fmt.Errorf("no retrieve function for cache get available")
}
}

type Cache struct {
type Cache[K comparable, V any] struct {
metrics module.CacheMetrics
limit uint
store storeFunc
retrieve retrieveFunc
store storeFunc[K, V]
retrieve retrieveFunc[K, V]
resource string
cache *lru.Cache
cache *lru.Cache[K, V]
}

func newCache(collector module.CacheMetrics, resourceName string, options ...func(*Cache)) *Cache {
c := Cache{
func newCache[K comparable, V any](collector module.CacheMetrics, resourceName string, options ...func(*Cache[K, V])) *Cache[K, V] {
c := Cache[K, V]{
metrics: collector,
limit: 1000,
store: noStore,
retrieve: noRetrieve,
store: noStore[K, V],
retrieve: noRetrieve[K, V],
resource: resourceName,
}
for _, option := range options {
option(&c)
}
c.cache, _ = lru.New(int(c.limit))
c.cache, _ = lru.New[K, V](int(c.limit))
c.metrics.CacheEntries(c.resource, uint(c.cache.Len()))
return &c
}

// IsCached returns true if the key exists in the cache.
// It DOES NOT check whether the key exists in the underlying data store.
func (c *Cache) IsCached(key any) bool {
exists := c.cache.Contains(key)
return exists
func (c *Cache[K, V]) IsCached(key K) bool {
return c.cache.Contains(key)
}

// Get will try to retrieve the resource from cache first, and then from the
// injected. During normal operations, the following error returns are expected:
// - `storage.ErrNotFound` if key is unknown.
func (c *Cache) Get(key interface{}) func(*badger.Txn) (interface{}, error) {
return func(tx *badger.Txn) (interface{}, error) {
func (c *Cache[K, V]) Get(key K) func(*badger.Txn) (V, error) {
return func(tx *badger.Txn) (V, error) {

// check if we have it in the cache
resource, cached := c.cache.Get(key)
Expand All @@ -105,7 +105,8 @@ func (c *Cache) Get(key interface{}) func(*badger.Txn) (interface{}, error) {
if errors.Is(err, storage.ErrNotFound) {
c.metrics.CacheNotFound(c.resource)
}
return nil, fmt.Errorf("could not retrieve resource: %w", err)
var nullV V
return nullV, fmt.Errorf("could not retrieve resource: %w", err)
}

c.metrics.CacheMiss(c.resource)
Expand All @@ -120,12 +121,12 @@ func (c *Cache) Get(key interface{}) func(*badger.Txn) (interface{}, error) {
}
}

func (c *Cache) Remove(key interface{}) {
func (c *Cache[K, V]) Remove(key K) {
c.cache.Remove(key)
}

// Insert will add a resource directly to the cache with the given ID
func (c *Cache) Insert(key interface{}, resource interface{}) {
func (c *Cache[K, V]) Insert(key K, resource V) {
// cache the resource and eject least recently used one if we reached limit
evicted := c.cache.Add(key, resource)
if !evicted {
Expand All @@ -134,7 +135,7 @@ func (c *Cache) Insert(key interface{}, resource interface{}) {
}

// PutTx will return tx which adds a resource to the cache with the given ID.
func (c *Cache) PutTx(key interface{}, resource interface{}) func(*transaction.Tx) error {
func (c *Cache[K, V]) PutTx(key K, resource V) func(*transaction.Tx) error {
storeOps := c.store(key, resource) // assemble DB operations to store resource (no execution)

return func(tx *transaction.Tx) error {
Expand Down
3 changes: 2 additions & 1 deletion storage/badger/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ import (

"github.com/stretchr/testify/assert"

"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/utils/unittest"
)

// TestCache_Exists tests existence checking items in the cache.
func TestCache_Exists(t *testing.T) {
cache := newCache(metrics.NewNoopCollector(), "test")
cache := newCache[flow.Identifier, any](metrics.NewNoopCollector(), "test")

t.Run("non-existent", func(t *testing.T) {
key := unittest.IdentifierFixture()
Expand Down
23 changes: 10 additions & 13 deletions storage/badger/chunkDataPacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,25 @@ import (
type ChunkDataPacks struct {
db *badger.DB
collections storage.Collections
byChunkIDCache *Cache
byChunkIDCache *Cache[flow.Identifier, *badgermodel.StoredChunkDataPack]
}

func NewChunkDataPacks(collector module.CacheMetrics, db *badger.DB, collections storage.Collections, byChunkIDCacheSize uint) *ChunkDataPacks {

store := func(key interface{}, val interface{}) func(*transaction.Tx) error {
chdp := val.(*badgermodel.StoredChunkDataPack)
return transaction.WithTx(operation.SkipDuplicates(operation.InsertChunkDataPack(chdp)))
store := func(key flow.Identifier, val *badgermodel.StoredChunkDataPack) func(*transaction.Tx) error {
return transaction.WithTx(operation.SkipDuplicates(operation.InsertChunkDataPack(val)))
}

retrieve := func(key interface{}) func(tx *badger.Txn) (interface{}, error) {
chunkID := key.(flow.Identifier)

var c badgermodel.StoredChunkDataPack
return func(tx *badger.Txn) (interface{}, error) {
err := operation.RetrieveChunkDataPack(chunkID, &c)(tx)
retrieve := func(key flow.Identifier) func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
return func(tx *badger.Txn) (*badgermodel.StoredChunkDataPack, error) {
var c badgermodel.StoredChunkDataPack
err := operation.RetrieveChunkDataPack(key, &c)(tx)
return &c, err
}
}

cache := newCache(collector, metrics.ResourceChunkDataPack,
withLimit(byChunkIDCacheSize),
cache := newCache[flow.Identifier, *badgermodel.StoredChunkDataPack](collector, metrics.ResourceChunkDataPack,
withLimit[flow.Identifier, *badgermodel.StoredChunkDataPack](byChunkIDCacheSize),
withStore(store),
withRetrieve(retrieve),
)
Expand Down Expand Up @@ -135,7 +132,7 @@ func (ch *ChunkDataPacks) retrieveCHDP(chunkID flow.Identifier) func(*badger.Txn
if err != nil {
return nil, err
}
return val.(*badgermodel.StoredChunkDataPack), nil
return val, nil
}
}

Expand Down
17 changes: 7 additions & 10 deletions storage/badger/cluster_payloads.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,27 @@ import (
// cluster consensus.
type ClusterPayloads struct {
db *badger.DB
cache *Cache
cache *Cache[flow.Identifier, *cluster.Payload]
}

func NewClusterPayloads(cacheMetrics module.CacheMetrics, db *badger.DB) *ClusterPayloads {

store := func(key interface{}, val interface{}) func(*transaction.Tx) error {
blockID := key.(flow.Identifier)
payload := val.(*cluster.Payload)
store := func(blockID flow.Identifier, payload *cluster.Payload) func(*transaction.Tx) error {
return transaction.WithTx(procedure.InsertClusterPayload(blockID, payload))
}

retrieve := func(key interface{}) func(tx *badger.Txn) (interface{}, error) {
blockID := key.(flow.Identifier)
retrieve := func(blockID flow.Identifier) func(tx *badger.Txn) (*cluster.Payload, error) {
var payload cluster.Payload
return func(tx *badger.Txn) (interface{}, error) {
return func(tx *badger.Txn) (*cluster.Payload, error) {
err := procedure.RetrieveClusterPayload(blockID, &payload)(tx)
return &payload, err
}
}

cp := &ClusterPayloads{
db: db,
cache: newCache(cacheMetrics, metrics.ResourceClusterPayload,
withLimit(flow.DefaultTransactionExpiry*4),
cache: newCache[flow.Identifier, *cluster.Payload](cacheMetrics, metrics.ResourceClusterPayload,
withLimit[flow.Identifier, *cluster.Payload](flow.DefaultTransactionExpiry*4),
withStore(store),
withRetrieve(retrieve)),
}
Expand All @@ -56,7 +53,7 @@ func (cp *ClusterPayloads) retrieveTx(blockID flow.Identifier) func(*badger.Txn)
if err != nil {
return nil, err
}
return val.(*cluster.Payload), nil
return val, nil
}
}

Expand Down
19 changes: 8 additions & 11 deletions storage/badger/commits.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,27 @@ import (

type Commits struct {
db *badger.DB
cache *Cache
cache *Cache[flow.Identifier, flow.StateCommitment]
}

func NewCommits(collector module.CacheMetrics, db *badger.DB) *Commits {

store := func(key interface{}, val interface{}) func(*transaction.Tx) error {
blockID := key.(flow.Identifier)
commit := val.(flow.StateCommitment)
store := func(blockID flow.Identifier, commit flow.StateCommitment) func(*transaction.Tx) error {
return transaction.WithTx(operation.SkipDuplicates(operation.IndexStateCommitment(blockID, commit)))
}

retrieve := func(key interface{}) func(tx *badger.Txn) (interface{}, error) {
blockID := key.(flow.Identifier)
var commit flow.StateCommitment
return func(tx *badger.Txn) (interface{}, error) {
retrieve := func(blockID flow.Identifier) func(tx *badger.Txn) (flow.StateCommitment, error) {
return func(tx *badger.Txn) (flow.StateCommitment, error) {
var commit flow.StateCommitment
err := operation.LookupStateCommitment(blockID, &commit)(tx)
return commit, err
}
}

c := &Commits{
db: db,
cache: newCache(collector, metrics.ResourceCommit,
withLimit(1000),
cache: newCache[flow.Identifier, flow.StateCommitment](collector, metrics.ResourceCommit,
withLimit[flow.Identifier, flow.StateCommitment](1000),
withStore(store),
withRetrieve(retrieve),
),
Expand All @@ -55,7 +52,7 @@ func (c *Commits) retrieveTx(blockID flow.Identifier) func(tx *badger.Txn) (flow
if err != nil {
return flow.DummyStateCommitment, err
}
return val.(flow.StateCommitment), nil
return val, nil
}
}

Expand Down
Loading

0 comments on commit 3f3da35

Please sign in to comment.