Skip to content

Commit

Permalink
Merge pull request #5321 from onflow/petera/fix-event-index-bounds-ch…
Browse files Browse the repository at this point in the history
…ecking

[Access] Fix event index bounds checking in Access APIs
  • Loading branch information
peterargue authored Jan 31, 2024
2 parents 55450cc + 978542e commit adbf9d0
Show file tree
Hide file tree
Showing 26 changed files with 754 additions and 143 deletions.
29 changes: 21 additions & 8 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ type FlowAccessNodeBuilder struct {
ExecutionIndexerCore *indexer.IndexerCore
ScriptExecutor *backend.ScriptExecutor
RegistersAsyncStore *execution.RegistersAsyncStore
EventsIndex *backend.EventsIndex
IndexerDependencies *cmd.DependencyList

// The sync engine participants provider is the libp2p peer store for the access node
Expand Down Expand Up @@ -773,11 +774,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, err
}

err = builder.RegistersAsyncStore.InitDataAvailable(registers)
if err != nil {
return nil, err
}

// setup requester to notify indexer when new execution data is received
execDataDistributor.AddOnExecutionDataReceivedConsumer(builder.ExecutionIndexer.OnExecutionData)

Expand All @@ -796,7 +792,20 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, err
}

builder.ScriptExecutor.InitReporter(builder.ExecutionIndexer, scripts)
err = builder.ScriptExecutor.Initialize(builder.ExecutionIndexer, scripts)
if err != nil {
return nil, err
}

err = builder.EventsIndex.Initialize(builder.ExecutionIndexer)
if err != nil {
return nil, err
}

err = builder.RegistersAsyncStore.Initialize(registers)
if err != nil {
return nil, err
}

return builder.ExecutionIndexer, nil
}, builder.IndexerDependencies)
Expand Down Expand Up @@ -837,7 +846,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.stateStreamConf,
node.State,
node.Storage.Headers,
node.Storage.Events,
node.Storage.Seals,
node.Storage.Results,
builder.ExecutionDataStore,
Expand All @@ -846,6 +854,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
builder.RegistersAsyncStore,
builder.EventsIndex,
useIndex,
)
if err != nil {
Expand Down Expand Up @@ -1445,6 +1454,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Module("events index", func(node *cmd.NodeConfig) error {
builder.EventsIndex = backend.NewEventsIndex(builder.Storage.Events)
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -1496,7 +1509,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Events: node.Storage.Events,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
Expand All @@ -1516,6 +1528,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
EventQueryMode: eventQueryMode,
EventsIndex: builder.EventsIndex,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
conf,
suite.state,
suite.headers,
suite.events,
suite.seals,
suite.results,
nil,
Expand All @@ -252,6 +251,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
rootBlock.Header.Height,
rootBlock.Header.Height,
suite.registers,
backend.NewEventsIndex(suite.events),
false,
)
assert.NoError(suite.T(), err)
Expand Down
4 changes: 2 additions & 2 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type Params struct {
HistoricalAccessNodes []accessproto.AccessAPIClient
Blocks storage.Blocks
Headers storage.Headers
Events storage.Events
Collections storage.Collections
Transactions storage.Transactions
ExecutionReceipts storage.ExecutionReceipts
Expand All @@ -104,6 +103,7 @@ type Params struct {
ScriptExecutor execution.ScriptExecutor
ScriptExecutionMode IndexQueryMode
EventQueryMode IndexQueryMode
EventsIndex *EventsIndex
}

// New creates backend instance
Expand Down Expand Up @@ -183,12 +183,12 @@ func New(params Params) (*Backend, error) {
chain: params.ChainID.Chain(),
state: params.State,
headers: params.Headers,
events: params.Events,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
maxHeightRange: params.MaxHeightRange,
nodeCommunicator: params.Communicator,
queryMode: params.EventQueryMode,
eventsIndex: params.EventsIndex,
},
backendBlockHeaders: backendBlockHeaders{
headers: params.Headers,
Expand Down
7 changes: 3 additions & 4 deletions engine/access/rpc/backend/backend_accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/execution"
execmock "github.com/onflow/flow-go/module/execution/mock"
"github.com/onflow/flow-go/module/irrecoverable"
protocol "github.com/onflow/flow-go/state/protocol/mock"
Expand Down Expand Up @@ -222,7 +221,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromStorage_Fails() {
statusCode codes.Code
}{
{
err: execution.ErrDataNotAvailable,
err: storage.ErrHeightNotIndexed,
statusCode: codes.OutOfRange,
},
{
Expand Down Expand Up @@ -267,7 +266,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromFailover_HappyPath() {
backend.scriptExecMode = IndexQueryModeFailover
backend.scriptExecutor = scriptExecutor

for _, errToReturn := range []error{execution.ErrDataNotAvailable, storage.ErrNotFound} {
for _, errToReturn := range []error{storage.ErrHeightNotIndexed, storage.ErrNotFound} {
scriptExecutor.On("GetAccountAtBlockHeight", mock.Anything, s.account.Address, s.block.Header.Height).
Return(nil, errToReturn).Times(3)

Expand Down Expand Up @@ -299,7 +298,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromFailover_ReturnsENErrors() {

scriptExecutor := execmock.NewScriptExecutor(s.T())
scriptExecutor.On("GetAccountAtBlockHeight", mock.Anything, s.failingAddress, s.block.Header.Height).
Return(nil, execution.ErrDataNotAvailable)
Return(nil, storage.ErrHeightNotIndexed)

backend := s.defaultBackend()
backend.scriptExecMode = IndexQueryModeFailover
Expand Down
11 changes: 7 additions & 4 deletions engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"github.com/onflow/flow-go/model/events"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/state_synchronization/indexer"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)

type backendEvents struct {
headers storage.Headers
events storage.Events
executionReceipts storage.ExecutionReceipts
state protocol.State
chain flow.Chain
Expand All @@ -36,6 +36,7 @@ type backendEvents struct {
maxHeightRange uint
nodeCommunicator Communicator
queryMode IndexQueryMode
eventsIndex *EventsIndex
}

// blockMetadata is used to capture information about requested blocks to avoid repeated blockID
Expand Down Expand Up @@ -226,15 +227,17 @@ func (b *backendEvents) getBlockEventsFromStorage(
) ([]flow.BlockEvents, []blockMetadata, error) {
missing := make([]blockMetadata, 0)
resp := make([]flow.BlockEvents, 0)

for _, blockInfo := range blockInfos {
if ctx.Err() != nil {
return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled)
}

events, err := b.events.ByBlockID(blockInfo.ID)
events, err := b.eventsIndex.GetEvents(blockInfo.ID, blockInfo.Height)
if err != nil {
// Note: if there are no events for a block, an empty slice is returned
if errors.Is(err, storage.ErrNotFound) {
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) ||
errors.Is(err, indexer.ErrIndexNotInitialized) {
missing = append(missing, blockInfo)
continue
}
Expand Down
34 changes: 25 additions & 9 deletions engine/access/rpc/backend/backend_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
syncmock "github.com/onflow/flow-go/module/state_synchronization/mock"
protocol "github.com/onflow/flow-go/state/protocol/mock"
"github.com/onflow/flow-go/storage"
storagemock "github.com/onflow/flow-go/storage/mock"
Expand All @@ -44,6 +45,7 @@ type BackendEventsSuite struct {
params *protocol.Params
rootHeader *flow.Header

eventsIndex *EventsIndex
events *storagemock.Events
headers *storagemock.Headers
receipts *storagemock.ExecutionReceipts
Expand Down Expand Up @@ -79,6 +81,7 @@ func (s *BackendEventsSuite) SetupTest() {

s.execClient = access.NewExecutionAPIClient(s.T())
s.executionNodes = unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
s.eventsIndex = NewEventsIndex(s.events)

blockCount := 5
s.blocks = make([]*flow.Block, blockCount)
Expand Down Expand Up @@ -163,13 +166,13 @@ func (s *BackendEventsSuite) defaultBackend() *backendEvents {
log: s.log,
chain: s.chainID.Chain(),
state: s.state,
events: s.events,
headers: s.headers,
executionReceipts: s.receipts,
connFactory: s.connectionFactory,
nodeCommunicator: NewNodeCommunicator(false),
maxHeightRange: DefaultMaxHeightRange,
queryMode: IndexQueryModeExecutionNodesOnly,
eventsIndex: s.eventsIndex,
}
}

Expand Down Expand Up @@ -250,6 +253,12 @@ func (s *BackendEventsSuite) TestGetEvents_HappyPaths() {
startHeight := s.blocks[0].Header.Height
endHeight := s.sealedHead.Height

reporter := syncmock.NewIndexReporter(s.T())
reporter.On("LowestIndexedHeight").Return(startHeight, nil)
reporter.On("HighestIndexedHeight").Return(endHeight+10, nil)
err := s.eventsIndex.Initialize(reporter)
s.Require().NoError(err)

s.state.On("Sealed").Return(s.snapshot)
s.snapshot.On("Head").Return(s.sealedHead, nil)

Expand Down Expand Up @@ -289,6 +298,7 @@ func (s *BackendEventsSuite) TestGetEvents_HappyPaths() {

s.Run(fmt.Sprintf("all from en - %s - %s", tt.encoding.String(), tt.queryMode), func() {
events := storagemock.NewEvents(s.T())
eventsIndex := NewEventsIndex(events)

switch tt.queryMode {
case IndexQueryModeLocalOnly:
Expand All @@ -298,12 +308,12 @@ func (s *BackendEventsSuite) TestGetEvents_HappyPaths() {
// only calls to EN, no calls to storage
case IndexQueryModeFailover:
// all calls to storage fail
events.On("ByBlockID", mock.Anything).Return(nil, storage.ErrNotFound)
// simulated by not initializing the eventIndex so all calls return ErrIndexNotInitialized
}

backend := s.defaultBackend()
backend.queryMode = tt.queryMode
backend.events = events
backend.eventsIndex = eventsIndex

s.setupENSuccessResponse(targetEvent, s.blocks)

Expand All @@ -318,26 +328,32 @@ func (s *BackendEventsSuite) TestGetEvents_HappyPaths() {

s.Run(fmt.Sprintf("mixed storage & en - %s - %s", tt.encoding.String(), tt.queryMode), func() {
events := storagemock.NewEvents(s.T())
eventsIndex := NewEventsIndex(events)

switch tt.queryMode {
case IndexQueryModeLocalOnly, IndexQueryModeExecutionNodesOnly:
// not applicable
return
case IndexQueryModeFailover:
// only failing blocks queried from EN
s.setupENSuccessResponse(targetEvent, s.blocks[0:2])
s.setupENSuccessResponse(targetEvent, []*flow.Block{s.blocks[0], s.blocks[4]})
}

// the first 2 blocks are not available from storage, and should be fetched from the EN
events.On("ByBlockID", s.blockIDs[0]).Return(nil, storage.ErrNotFound)
events.On("ByBlockID", s.blockIDs[1]).Return(nil, storage.ErrNotFound)
// the first and last blocks are not available from storage, and should be fetched from the EN
reporter := syncmock.NewIndexReporter(s.T())
reporter.On("LowestIndexedHeight").Return(s.blocks[1].Header.Height, nil)
reporter.On("HighestIndexedHeight").Return(s.blocks[3].Header.Height, nil)

events.On("ByBlockID", s.blockIDs[1]).Return(s.blockEvents, nil)
events.On("ByBlockID", s.blockIDs[2]).Return(s.blockEvents, nil)
events.On("ByBlockID", s.blockIDs[3]).Return(s.blockEvents, nil)
events.On("ByBlockID", s.blockIDs[4]).Return(s.blockEvents, nil)

err := eventsIndex.Initialize(reporter)
s.Require().NoError(err)

backend := s.defaultBackend()
backend.queryMode = tt.queryMode
backend.events = events
backend.eventsIndex = eventsIndex

response, err := backend.GetEventsForBlockIDs(ctx, targetEvent, s.blockIDs, tt.encoding)
s.Require().NoError(err)
Expand Down
7 changes: 6 additions & 1 deletion engine/access/rpc/backend/backend_scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/state_synchronization/indexer"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
Expand Down Expand Up @@ -362,7 +363,11 @@ func convertIndexError(err error, height uint64, defaultMsg string) error {
return nil
}

if errors.Is(err, execution.ErrDataNotAvailable) {
if errors.Is(err, indexer.ErrIndexNotInitialized) {
return status.Errorf(codes.FailedPrecondition, "data for block is not available: %v", err)
}

if errors.Is(err, storage.ErrHeightNotIndexed) {
return status.Errorf(codes.OutOfRange, "data for block height %d is not available", height)
}

Expand Down
7 changes: 3 additions & 4 deletions engine/access/rpc/backend/backend_scripts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
fvmerrors "github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/execution"
execmock "github.com/onflow/flow-go/module/execution/mock"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -243,7 +242,7 @@ func (s *BackendScriptsSuite) TestExecuteScriptFromStorage_Fails() {
statusCode codes.Code
}{
{
err: execution.ErrDataNotAvailable,
err: storage.ErrHeightNotIndexed,
statusCode: codes.OutOfRange,
},
{
Expand Down Expand Up @@ -288,7 +287,7 @@ func (s *BackendScriptsSuite) TestExecuteScriptWithFailover_HappyPath() {
ctx := context.Background()

errors := []error{
execution.ErrDataNotAvailable,
storage.ErrHeightNotIndexed,
storage.ErrNotFound,
fmt.Errorf("system error"),
fvmFailureErr,
Expand Down Expand Up @@ -383,7 +382,7 @@ func (s *BackendScriptsSuite) TestExecuteScriptWithFailover_ReturnsENErrors() {
// configure local script executor to fail
scriptExecutor := execmock.NewScriptExecutor(s.T())
scriptExecutor.On("ExecuteAtBlockHeight", mock.Anything, mock.Anything, mock.Anything, s.block.Header.Height).
Return(nil, execution.ErrDataNotAvailable)
Return(nil, storage.ErrHeightNotIndexed)

backend := s.defaultBackend()
backend.scriptExecMode = IndexQueryModeFailover
Expand Down
Loading

0 comments on commit adbf9d0

Please sign in to comment.