Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Fix event index bounds checking in Access APIs #5321

Merged
merged 7 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ type FlowAccessNodeBuilder struct {
ExecutionIndexerCore *indexer.IndexerCore
ScriptExecutor *backend.ScriptExecutor
RegistersAsyncStore *execution.RegistersAsyncStore
EventsIndex *backend.EventsIndex
IndexerDependencies *cmd.DependencyList

// The sync engine participants provider is the libp2p peer store for the access node
Expand Down Expand Up @@ -773,11 +774,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, err
}

err = builder.RegistersAsyncStore.InitDataAvailable(registers)
if err != nil {
return nil, err
}

// setup requester to notify indexer when new execution data is received
execDataDistributor.AddOnExecutionDataReceivedConsumer(builder.ExecutionIndexer.OnExecutionData)

Expand All @@ -796,7 +792,20 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, err
}

builder.ScriptExecutor.InitReporter(builder.ExecutionIndexer, scripts)
err = builder.ScriptExecutor.Initialize(builder.ExecutionIndexer, scripts)
if err != nil {
return nil, err
}

err = builder.EventsIndex.Initialize(builder.ExecutionIndexer)
if err != nil {
return nil, err
}

err = builder.RegistersAsyncStore.Initialize(registers)
if err != nil {
return nil, err
}

return builder.ExecutionIndexer, nil
}, builder.IndexerDependencies)
Expand Down Expand Up @@ -837,7 +846,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.stateStreamConf,
node.State,
node.Storage.Headers,
node.Storage.Events,
node.Storage.Seals,
node.Storage.Results,
builder.ExecutionDataStore,
Expand All @@ -846,6 +854,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
builder.RegistersAsyncStore,
builder.EventsIndex,
useIndex,
)
if err != nil {
Expand Down Expand Up @@ -1445,6 +1454,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.Storage.Events = bstorage.NewEvents(node.Metrics.Cache, node.DB)
return nil
}).
Module("events index", func(node *cmd.NodeConfig) error {
builder.EventsIndex = backend.NewEventsIndex(builder.Storage.Events)
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -1496,7 +1509,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
HistoricalAccessNodes: builder.HistoricalAccessRPCs,
Blocks: node.Storage.Blocks,
Headers: node.Storage.Headers,
Events: node.Storage.Events,
Collections: node.Storage.Collections,
Transactions: node.Storage.Transactions,
ExecutionReceipts: node.Storage.Receipts,
Expand All @@ -1516,6 +1528,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
ScriptExecutor: builder.ScriptExecutor,
ScriptExecutionMode: scriptExecMode,
EventQueryMode: eventQueryMode,
EventsIndex: builder.EventsIndex,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/integration_unsecure_grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
conf,
suite.state,
suite.headers,
suite.events,
suite.seals,
suite.results,
nil,
Expand All @@ -252,6 +251,7 @@ func (suite *SameGRPCPortTestSuite) SetupTest() {
rootBlock.Header.Height,
rootBlock.Header.Height,
suite.registers,
backend.NewEventsIndex(suite.events),
false,
)
assert.NoError(suite.T(), err)
Expand Down
4 changes: 2 additions & 2 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type Params struct {
HistoricalAccessNodes []accessproto.AccessAPIClient
Blocks storage.Blocks
Headers storage.Headers
Events storage.Events
Collections storage.Collections
Transactions storage.Transactions
ExecutionReceipts storage.ExecutionReceipts
Expand All @@ -104,6 +103,7 @@ type Params struct {
ScriptExecutor execution.ScriptExecutor
ScriptExecutionMode IndexQueryMode
EventQueryMode IndexQueryMode
EventsIndex *EventsIndex
}

// New creates backend instance
Expand Down Expand Up @@ -183,12 +183,12 @@ func New(params Params) (*Backend, error) {
chain: params.ChainID.Chain(),
state: params.State,
headers: params.Headers,
events: params.Events,
executionReceipts: params.ExecutionReceipts,
connFactory: params.ConnFactory,
maxHeightRange: params.MaxHeightRange,
nodeCommunicator: params.Communicator,
queryMode: params.EventQueryMode,
eventsIndex: params.EventsIndex,
},
backendBlockHeaders: backendBlockHeaders{
headers: params.Headers,
Expand Down
7 changes: 3 additions & 4 deletions engine/access/rpc/backend/backend_accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/execution"
execmock "github.com/onflow/flow-go/module/execution/mock"
"github.com/onflow/flow-go/module/irrecoverable"
protocol "github.com/onflow/flow-go/state/protocol/mock"
Expand Down Expand Up @@ -222,7 +221,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromStorage_Fails() {
statusCode codes.Code
}{
{
err: execution.ErrDataNotAvailable,
err: storage.ErrHeightNotIndexed,
statusCode: codes.OutOfRange,
},
{
Expand Down Expand Up @@ -267,7 +266,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromFailover_HappyPath() {
backend.scriptExecMode = IndexQueryModeFailover
backend.scriptExecutor = scriptExecutor

for _, errToReturn := range []error{execution.ErrDataNotAvailable, storage.ErrNotFound} {
for _, errToReturn := range []error{storage.ErrHeightNotIndexed, storage.ErrNotFound} {
scriptExecutor.On("GetAccountAtBlockHeight", mock.Anything, s.account.Address, s.block.Header.Height).
Return(nil, errToReturn).Times(3)

Expand Down Expand Up @@ -299,7 +298,7 @@ func (s *BackendAccountsSuite) TestGetAccountFromFailover_ReturnsENErrors() {

scriptExecutor := execmock.NewScriptExecutor(s.T())
scriptExecutor.On("GetAccountAtBlockHeight", mock.Anything, s.failingAddress, s.block.Header.Height).
Return(nil, execution.ErrDataNotAvailable)
Return(nil, storage.ErrHeightNotIndexed)

backend := s.defaultBackend()
backend.scriptExecMode = IndexQueryModeFailover
Expand Down
11 changes: 7 additions & 4 deletions engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"github.com/onflow/flow-go/model/events"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/state_synchronization/indexer"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)

type backendEvents struct {
headers storage.Headers
events storage.Events
executionReceipts storage.ExecutionReceipts
state protocol.State
chain flow.Chain
Expand All @@ -36,6 +36,7 @@ type backendEvents struct {
maxHeightRange uint
nodeCommunicator Communicator
queryMode IndexQueryMode
eventsIndex *EventsIndex
}

// blockMetadata is used to capture information about requested blocks to avoid repeated blockID
Expand Down Expand Up @@ -226,15 +227,17 @@ func (b *backendEvents) getBlockEventsFromStorage(
) ([]flow.BlockEvents, []blockMetadata, error) {
missing := make([]blockMetadata, 0)
resp := make([]flow.BlockEvents, 0)

for _, blockInfo := range blockInfos {
if ctx.Err() != nil {
return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled)
}

events, err := b.events.ByBlockID(blockInfo.ID)
events, err := b.eventsIndex.GetEvents(blockInfo.ID, blockInfo.Height)
if err != nil {
// Note: if there are no events for a block, an empty slice is returned
if errors.Is(err, storage.ErrNotFound) {
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) ||
errors.Is(err, indexer.ErrIndexNotInitialized) {
missing = append(missing, blockInfo)
continue
}
Expand Down
34 changes: 25 additions & 9 deletions engine/access/rpc/backend/backend_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/irrecoverable"
syncmock "github.com/onflow/flow-go/module/state_synchronization/mock"
protocol "github.com/onflow/flow-go/state/protocol/mock"
"github.com/onflow/flow-go/storage"
storagemock "github.com/onflow/flow-go/storage/mock"
Expand All @@ -44,6 +45,7 @@ type BackendEventsSuite struct {
params *protocol.Params
rootHeader *flow.Header

eventsIndex *EventsIndex
events *storagemock.Events
headers *storagemock.Headers
receipts *storagemock.ExecutionReceipts
Expand Down Expand Up @@ -79,6 +81,7 @@ func (s *BackendEventsSuite) SetupTest() {

s.execClient = access.NewExecutionAPIClient(s.T())
s.executionNodes = unittest.IdentityListFixture(2, unittest.WithRole(flow.RoleExecution))
s.eventsIndex = NewEventsIndex(s.events)

blockCount := 5
s.blocks = make([]*flow.Block, blockCount)
Expand Down Expand Up @@ -163,13 +166,13 @@ func (s *BackendEventsSuite) defaultBackend() *backendEvents {
log: s.log,
chain: s.chainID.Chain(),
state: s.state,
events: s.events,
headers: s.headers,
executionReceipts: s.receipts,
connFactory: s.connectionFactory,
nodeCommunicator: NewNodeCommunicator(false),
maxHeightRange: DefaultMaxHeightRange,
queryMode: IndexQueryModeExecutionNodesOnly,
eventsIndex: s.eventsIndex,
}
}

Expand Down Expand Up @@ -250,6 +253,12 @@ func (s *BackendEventsSuite) TestGetEvents_HappyPaths() {
startHeight := s.blocks[0].Header.Height
endHeight := s.sealedHead.Height

reporter := syncmock.NewIndexReporter(s.T())
reporter.On("LowestIndexedHeight").Return(startHeight, nil)
reporter.On("HighestIndexedHeight").Return(endHeight+10, nil)
err := s.eventsIndex.Initialize(reporter)
s.Require().NoError(err)

s.state.On("Sealed").Return(s.snapshot)
s.snapshot.On("Head").Return(s.sealedHead, nil)

Expand Down Expand Up @@ -289,6 +298,7 @@ func (s *BackendEventsSuite) TestGetEvents_HappyPaths() {

s.Run(fmt.Sprintf("all from en - %s - %s", tt.encoding.String(), tt.queryMode), func() {
events := storagemock.NewEvents(s.T())
eventsIndex := NewEventsIndex(events)

switch tt.queryMode {
case IndexQueryModeLocalOnly:
Expand All @@ -298,12 +308,12 @@ func (s *BackendEventsSuite) TestGetEvents_HappyPaths() {
// only calls to EN, no calls to storage
case IndexQueryModeFailover:
// all calls to storage fail
events.On("ByBlockID", mock.Anything).Return(nil, storage.ErrNotFound)
// simulated by not initializing the eventIndex so all calls return ErrIndexNotInitialized
}

backend := s.defaultBackend()
backend.queryMode = tt.queryMode
backend.events = events
backend.eventsIndex = eventsIndex

s.setupENSuccessResponse(targetEvent, s.blocks)

Expand All @@ -318,26 +328,32 @@ func (s *BackendEventsSuite) TestGetEvents_HappyPaths() {

s.Run(fmt.Sprintf("mixed storage & en - %s - %s", tt.encoding.String(), tt.queryMode), func() {
events := storagemock.NewEvents(s.T())
eventsIndex := NewEventsIndex(events)

switch tt.queryMode {
case IndexQueryModeLocalOnly, IndexQueryModeExecutionNodesOnly:
// not applicable
return
case IndexQueryModeFailover:
// only failing blocks queried from EN
s.setupENSuccessResponse(targetEvent, s.blocks[0:2])
s.setupENSuccessResponse(targetEvent, []*flow.Block{s.blocks[0], s.blocks[4]})
}

// the first 2 blocks are not available from storage, and should be fetched from the EN
events.On("ByBlockID", s.blockIDs[0]).Return(nil, storage.ErrNotFound)
events.On("ByBlockID", s.blockIDs[1]).Return(nil, storage.ErrNotFound)
// the first and last blocks are not available from storage, and should be fetched from the EN
reporter := syncmock.NewIndexReporter(s.T())
reporter.On("LowestIndexedHeight").Return(s.blocks[1].Header.Height, nil)
reporter.On("HighestIndexedHeight").Return(s.blocks[3].Header.Height, nil)

events.On("ByBlockID", s.blockIDs[1]).Return(s.blockEvents, nil)
events.On("ByBlockID", s.blockIDs[2]).Return(s.blockEvents, nil)
events.On("ByBlockID", s.blockIDs[3]).Return(s.blockEvents, nil)
events.On("ByBlockID", s.blockIDs[4]).Return(s.blockEvents, nil)

err := eventsIndex.Initialize(reporter)
s.Require().NoError(err)

backend := s.defaultBackend()
backend.queryMode = tt.queryMode
backend.events = events
backend.eventsIndex = eventsIndex

response, err := backend.GetEventsForBlockIDs(ctx, targetEvent, s.blockIDs, tt.encoding)
s.Require().NoError(err)
Expand Down
7 changes: 6 additions & 1 deletion engine/access/rpc/backend/backend_scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/state_synchronization/indexer"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
Expand Down Expand Up @@ -362,7 +363,11 @@ func convertIndexError(err error, height uint64, defaultMsg string) error {
return nil
}

if errors.Is(err, execution.ErrDataNotAvailable) {
if errors.Is(err, indexer.ErrIndexNotInitialized) {
return status.Errorf(codes.FailedPrecondition, "data for block is not available: %v", err)
}

if errors.Is(err, storage.ErrHeightNotIndexed) {
return status.Errorf(codes.OutOfRange, "data for block height %d is not available", height)
}

Expand Down
7 changes: 3 additions & 4 deletions engine/access/rpc/backend/backend_scripts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
connectionmock "github.com/onflow/flow-go/engine/access/rpc/connection/mock"
fvmerrors "github.com/onflow/flow-go/fvm/errors"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/execution"
execmock "github.com/onflow/flow-go/module/execution/mock"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -243,7 +242,7 @@ func (s *BackendScriptsSuite) TestExecuteScriptFromStorage_Fails() {
statusCode codes.Code
}{
{
err: execution.ErrDataNotAvailable,
err: storage.ErrHeightNotIndexed,
statusCode: codes.OutOfRange,
},
{
Expand Down Expand Up @@ -288,7 +287,7 @@ func (s *BackendScriptsSuite) TestExecuteScriptWithFailover_HappyPath() {
ctx := context.Background()

errors := []error{
execution.ErrDataNotAvailable,
storage.ErrHeightNotIndexed,
storage.ErrNotFound,
fmt.Errorf("system error"),
fvmFailureErr,
Expand Down Expand Up @@ -383,7 +382,7 @@ func (s *BackendScriptsSuite) TestExecuteScriptWithFailover_ReturnsENErrors() {
// configure local script executor to fail
scriptExecutor := execmock.NewScriptExecutor(s.T())
scriptExecutor.On("ExecuteAtBlockHeight", mock.Anything, mock.Anything, mock.Anything, s.block.Header.Height).
Return(nil, execution.ErrDataNotAvailable)
Return(nil, storage.ErrHeightNotIndexed)

backend := s.defaultBackend()
backend.scriptExecMode = IndexQueryModeFailover
Expand Down
Loading
Loading