Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Storehouse] Blockend Snapshot #4985

Merged
merged 13 commits into from
Nov 22, 2023
88 changes: 88 additions & 0 deletions engine/execution/storehouse/block_end_snapshot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package storehouse

import (
"errors"
"sync"

"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/fvm/storage/snapshot"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
)

var _ snapshot.StorageSnapshot = (*BlockEndStateSnapshot)(nil)

// BlockEndStateSnapshot represents the storage at the end of a block.
type BlockEndStateSnapshot struct {
storage execution.RegisterStore

blockID flow.Identifier
height uint64

mutex sync.RWMutex
readCache map[flow.RegisterID]flow.RegisterValue // cache the reads from storage at baseBlock
}

// the caller must ensure the block height is for the given block
func NewBlockEndStateSnapshot(
storage execution.RegisterStore,
blockID flow.Identifier,
height uint64,
) *BlockEndStateSnapshot {
return &BlockEndStateSnapshot{
storage: storage,
blockID: blockID,
height: height,
readCache: make(map[flow.RegisterID]flow.RegisterValue),
}
}

// Get returns the value of the register with the given register ID.
// It returns:
// - (value, nil) if the register exists
// - (nil, nil) if the register does not exist
// - (nil, storage.ErrHeightNotIndexed) if the height is below the first height that is indexed.
// - (nil, storehouse.ErrNotExecuted) if the block is not executed yet
// - (nil, storehouse.ErrNotExecuted) if the block is conflicting with finalized block
// - (nil, err) for any other exceptions
func (s *BlockEndStateSnapshot) Get(id flow.RegisterID) (flow.RegisterValue, error) {
value, ok := s.getFromCache(id)
if ok {
return value, nil
}

value, err := s.getFromStorage(id)
if err != nil {
return nil, err
}

s.mutex.Lock()
defer s.mutex.Unlock()

// TODO: consider adding a limit/eviction policy for the cache
s.readCache[id] = value
return value, err
}

func (s *BlockEndStateSnapshot) getFromCache(id flow.RegisterID) (flow.RegisterValue, bool) {
s.mutex.RLock()
defer s.mutex.RUnlock()

value, ok := s.readCache[id]
peterargue marked this conversation as resolved.
Show resolved Hide resolved
return value, ok
}

func (s *BlockEndStateSnapshot) getFromStorage(id flow.RegisterID) (flow.RegisterValue, error) {
value, err := s.storage.GetRegister(s.height, s.blockID, id)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
// if the error is not found, we return a nil RegisterValue,
// in this case, the nil value can be cached, because the storage will not change it
return nil, nil
}
// if the error is not ErrNotFound, such as storage.ErrHeightNotIndexed, storehouse.ErrNotExecuted
// we return the error without caching
return nil, err
}
return value, nil
}
102 changes: 102 additions & 0 deletions engine/execution/storehouse/block_end_snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package storehouse_test

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"

executionMock "github.com/onflow/flow-go/engine/execution/mock"
"github.com/onflow/flow-go/engine/execution/storehouse"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/unittest"
)

func TestBlockEndSnapshot(t *testing.T) {
t.Run("Get register", func(t *testing.T) {
header := unittest.BlockHeaderFixture()

// create mock for storage
store := executionMock.NewRegisterStore(t)
reg := unittest.MakeOwnerReg("key", "value")
store.On("GetRegister", header.Height, header.ID(), reg.Key).Return(reg.Value, nil).Once()
snapshot := storehouse.NewBlockEndStateSnapshot(store, header.ID(), header.Height)

// test get from storage
value, err := snapshot.Get(reg.Key)
require.NoError(t, err)
require.Equal(t, reg.Value, value)

// test get from cache
value, err = snapshot.Get(reg.Key)
require.NoError(t, err)
require.Equal(t, reg.Value, value)

// test get non existing register
unknownReg := unittest.MakeOwnerReg("unknown", "unknown")
store.On("GetRegister", header.Height, header.ID(), unknownReg.Key).
Return(nil, fmt.Errorf("fail: %w", storage.ErrNotFound)).Once()

value, err = snapshot.Get(unknownReg.Key)
require.NoError(t, err)
require.Nil(t, value)

// test get non existing register from cache
_, err = snapshot.Get(unknownReg.Key)
require.NoError(t, err)
require.Nil(t, value)

// test getting storage.ErrHeightNotIndexed error
heightNotIndexed := unittest.MakeOwnerReg("height not index", "height not index")
store.On("GetRegister", header.Height, header.ID(), heightNotIndexed.Key).
Return(nil, fmt.Errorf("fail: %w", storage.ErrHeightNotIndexed)).
Twice() // to verify the result is not cached

// verify getting the correct error
_, err = snapshot.Get(heightNotIndexed.Key)
require.Error(t, err)
require.True(t, errors.Is(err, storage.ErrHeightNotIndexed))

// verify result is not cached
_, err = snapshot.Get(heightNotIndexed.Key)
require.Error(t, err)
require.True(t, errors.Is(err, storage.ErrHeightNotIndexed))

// test getting storage.ErrNotExecuted error
heightNotExecuted := unittest.MakeOwnerReg("height not executed", "height not executed")
counter := atomic.NewInt32(0)
store.
On("GetRegister", header.Height, header.ID(), heightNotExecuted.Key).
Return(func(uint64, flow.Identifier, flow.RegisterID) (flow.RegisterValue, error) {
counter.Inc()
// the first call should return error
if counter.Load() == 1 {
return nil, fmt.Errorf("fail: %w", storehouse.ErrNotExecuted)
}
// the second call, it returns value
return heightNotExecuted.Value, nil
}).
Times(2)

// first time should return error
_, err = snapshot.Get(heightNotExecuted.Key)
require.Error(t, err)
require.True(t, errors.Is(err, storehouse.ErrNotExecuted))

// second time should return value
value, err = snapshot.Get(heightNotExecuted.Key)
require.NoError(t, err)
require.Equal(t, heightNotExecuted.Value, value)

// third time should be cached
value, err = snapshot.Get(heightNotExecuted.Key)
require.NoError(t, err)
require.Equal(t, heightNotExecuted.Value, value)

store.AssertExpectations(t)
})

}
Loading