Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Access] Add implementation for usage of the local transaction result in Access API #5306

Merged
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
955885a
Added fetching of transaction result from local storage
Guitarheroua Jan 19, 2024
de15071
Create transactions local data provider
Guitarheroua Jan 26, 2024
712a29f
Fixed tests
Guitarheroua Jan 26, 2024
c970180
Added documentation to local data provider
Guitarheroua Jan 26, 2024
fcad7aa
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Jan 26, 2024
cc4194a
Added documentation coments
Guitarheroua Jan 26, 2024
61b2bf8
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Jan 27, 2024
e095d0a
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
franklywatson Jan 30, 2024
0ed0f0b
Apply suggestions from code review
Guitarheroua Feb 6, 2024
c918d35
Replace transaction result query with index query mode
Guitarheroua Feb 6, 2024
cfc9ead
Moved tx errors callbacks to structure
Guitarheroua Feb 8, 2024
3acd3a9
Failover on any error frol local storage in IndexQueryModeFailover
Guitarheroua Feb 8, 2024
0a8c56e
Make transaction error message interface
Guitarheroua Feb 8, 2024
1e53c22
Refactored deriveTransactionStatus
Guitarheroua Feb 8, 2024
f72aaa8
Merge branch 'master' of github.com:Guitarheroua/flow-go into guitarh…
Guitarheroua Feb 8, 2024
721c666
Added tx result indexer
Guitarheroua Feb 9, 2024
c9c0675
removed unnecessary files
Guitarheroua Feb 9, 2024
1d0e625
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 9, 2024
5ec4c39
Fixxed issue with tx status
Guitarheroua Feb 9, 2024
29ab84c
linted
Guitarheroua Feb 9, 2024
aeff1d5
Apply suggestions from code review
Guitarheroua Feb 19, 2024
99ca5ba
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 19, 2024
2cd44dd
linted
Guitarheroua Feb 19, 2024
2131d5e
Fixed remarks
Guitarheroua Feb 19, 2024
9396412
Fixed remarks
Guitarheroua Feb 19, 2024
0abf946
Added coments
Guitarheroua Feb 19, 2024
95fb7e9
Renamed event index getter methods
Guitarheroua Feb 21, 2024
88be08c
Apply suggestions from code review
Guitarheroua Feb 21, 2024
e8baab3
Fixed remark with NotFound error.
Guitarheroua Feb 21, 2024
9b6990f
Merge branch 'master' of github.com:Guitarheroua/flow-go into guitarh…
Guitarheroua Feb 21, 2024
5e638af
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 21, 2024
8a3e5c9
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 21, 2024
bff40e9
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 21, 2024
0d6a002
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 22, 2024
f7f6d15
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 23, 2024
930cb8c
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 23, 2024
6cbf92e
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 26, 2024
54dcfe6
linted
Guitarheroua Feb 26, 2024
4760e29
Added comment to converter
Guitarheroua Feb 27, 2024
ff5ff58
Apply suggestions from code review
Guitarheroua Feb 27, 2024
c769b93
Added documentation for indexers
Guitarheroua Feb 27, 2024
1ff2c41
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
Guitarheroua Feb 27, 2024
5e2df57
Added documentation according to review
Guitarheroua Feb 27, 2024
58149ca
Fixed unit tests remarks
Guitarheroua Feb 27, 2024
41ea906
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
franklywatson Feb 27, 2024
e390005
Apply suggestions from code review
Guitarheroua Feb 28, 2024
c7c489b
Fixed remarks
Guitarheroua Feb 28, 2024
b2e15a7
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
franklywatson Feb 28, 2024
b954a35
Merge branch 'master' into guitarheroua/4753-local-index-tx-result
franklywatson Feb 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
},
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly.String(), // default to ENs only for now
EventQueryMode: backend.IndexQueryModeExecutionNodesOnly.String(), // default to ENs only for now
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly.String(), // default to ENs only for now
},
RestConfig: rest.Config{
ListenAddress: "",
Expand Down Expand Up @@ -283,6 +284,7 @@ type FlowAccessNodeBuilder struct {
ScriptExecutor *backend.ScriptExecutor
RegistersAsyncStore *execution.RegistersAsyncStore
EventsIndex *backend.EventsIndex
TxResultsIndex *backend.TransactionResultsIndex
IndexerDependencies *cmd.DependencyList

// The sync engine participants provider is the libp2p peer store for the access node
Expand Down Expand Up @@ -838,6 +840,11 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil, err
}

err = builder.TxResultsIndex.Initialize(builder.ExecutionIndexer)
if err != nil {
return nil, err
}

err = builder.RegistersAsyncStore.Initialize(registers)
if err != nil {
return nil, err
Expand Down Expand Up @@ -933,7 +940,6 @@ func FlowAccessNode(nodeBuilder *cmd.FlowNodeBuilder) *FlowAccessNodeBuilder {
}

func (builder *FlowAccessNodeBuilder) ParseFlags() error {

builder.BaseFlags()

builder.extraFlags()
Expand Down Expand Up @@ -1140,6 +1146,11 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
defaultConfig.rpcConf.BackendConfig.EventQueryMode,
"mode to use when querying events. one of [local-only, execution-nodes-only(default), failover]")

flags.StringVar(&builder.rpcConf.BackendConfig.TxResultQueryMode,
"tx-result-query-mode",
defaultConfig.rpcConf.BackendConfig.TxResultQueryMode,
"mode to use when querying transaction results. one of [local-only, execution-nodes-only(default), failover]")

// Script Execution
flags.StringVar(&builder.rpcConf.BackendConfig.ScriptExecutionMode,
"script-execution-mode",
Expand Down Expand Up @@ -1169,7 +1180,6 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"script-execution-max-height",
defaultConfig.scriptExecMaxBlock,
"highest block height to allow for script execution. default: no limit")

}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down Expand Up @@ -1498,6 +1508,10 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.EventsIndex = backend.NewEventsIndex(builder.Storage.Events)
return nil
}).
Module("transaction result index", func(node *cmd.NodeConfig) error {
builder.TxResultsIndex = backend.NewTransactionResultsIndex(builder.Storage.LightTransactionResults)
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
config := builder.rpcConf
backendConfig := config.BackendConfig
Expand Down Expand Up @@ -1537,12 +1551,20 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {

eventQueryMode, err := backend.ParseIndexQueryMode(config.BackendConfig.EventQueryMode)
if err != nil {
return nil, fmt.Errorf("could not parse script execution mode: %w", err)
return nil, fmt.Errorf("could not parse event query mode: %w", err)
Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
}
if eventQueryMode == backend.IndexQueryModeCompare {
return nil, fmt.Errorf("event query mode 'compare' is not supported")
}

txResultQueryMode, err := backend.ParseIndexQueryMode(config.BackendConfig.TxResultQueryMode)
if err != nil {
return nil, fmt.Errorf("could not parse transaction result query mode: %w", err)
}
if txResultQueryMode == backend.IndexQueryModeCompare {
return nil, fmt.Errorf("transaction result query mode 'compare' is not supported")
}

Guitarheroua marked this conversation as resolved.
Show resolved Hide resolved
nodeBackend, err := backend.New(backend.Params{
State: node.State,
CollectionRPC: builder.CollectionRPC,
Expand All @@ -1569,6 +1591,8 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
ScriptExecutionMode: scriptExecMode,
EventQueryMode: eventQueryMode,
EventsIndex: builder.EventsIndex,
TxResultQueryMode: txResultQueryMode,
TxResultsIndex: builder.TxResultsIndex,
})
if err != nil {
return nil, fmt.Errorf("could not initialize backend: %w", err)
Expand Down Expand Up @@ -1699,7 +1723,6 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.nodeInfoFile,
node.PingService,
)

if err != nil {
return nil, fmt.Errorf("could not create ping engine: %w", err)
}
Expand Down Expand Up @@ -1801,7 +1824,8 @@ func (builder *FlowAccessNodeBuilder) enqueuePublicNetworkInit() {
// - The libp2p node instance for the public network.
// - Any error encountered during initialization. Any error should be considered fatal.
func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.PrivateKey, bindAddress string, networkMetrics module.LibP2PMetrics) (p2p.LibP2PNode,
error) {
error,
) {
connManager, err := connection.NewConnManager(builder.Logger, networkMetrics, &builder.FlowConfig.NetworkConfig.ConnectionManager)
if err != nil {
return nil, fmt.Errorf("could not create connection manager: %w", err)
Expand Down Expand Up @@ -1837,7 +1861,6 @@ func (builder *FlowAccessNodeBuilder) initPublicLibp2pNode(networkKey crypto.Pri
return dht.NewDHT(ctx, h, protocols.FlowPublicDHTProtocolID(builder.SporkID), builder.Logger, networkMetrics, dht.AsServer())
}).
Build()

if err != nil {
return nil, fmt.Errorf("could not build libp2p node for staked access node: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions engine/access/access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,7 @@ func (suite *Suite) TestGetSealedTransaction() {
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxErrorMessagesCacheSize: 1000,
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
})
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -794,6 +795,7 @@ func (suite *Suite) TestGetTransactionResult() {
SnapshotHistoryLimit: backend.DefaultSnapshotHistoryLimit,
Communicator: backend.NewNodeCommunicator(false),
TxErrorMessagesCacheSize: 1000,
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
})
require.NoError(suite.T(), err)

Expand Down Expand Up @@ -987,6 +989,7 @@ func (suite *Suite) TestExecuteScript() {
Communicator: backend.NewNodeCommunicator(false),
ScriptExecutionMode: backend.IndexQueryModeExecutionNodesOnly,
TxErrorMessagesCacheSize: 1000,
TxResultQueryMode: backend.IndexQueryModeExecutionNodesOnly,
})
require.NoError(suite.T(), err)

Expand Down
31 changes: 19 additions & 12 deletions engine/access/rpc/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ const DefaultLoggedScriptsCacheSize = 1_000_000
// DefaultConnectionPoolSize is the default size for the connection pool to collection and execution nodes
const DefaultConnectionPoolSize = 250

var preferredENIdentifiers flow.IdentifierList
var fixedENIdentifiers flow.IdentifierList
var (
preferredENIdentifiers flow.IdentifierList
fixedENIdentifiers flow.IdentifierList
)

// Backend implements the Access API.
//
Expand Down Expand Up @@ -87,7 +89,6 @@ type Params struct {
Transactions storage.Transactions
ExecutionReceipts storage.ExecutionReceipts
ExecutionResults storage.ExecutionResults
LightTransactionResults storage.LightTransactionResults
ChainID flow.ChainID
AccessMetrics module.AccessMetrics
ConnFactory connection.ConnectionFactory
Expand All @@ -104,8 +105,12 @@ type Params struct {
ScriptExecutionMode IndexQueryMode
EventQueryMode IndexQueryMode
EventsIndex *EventsIndex
TxResultQueryMode IndexQueryMode
TxResultsIndex *TransactionResultsIndex
}

var _ TransactionErrorMessage = (*Backend)(nil)

// New creates backend instance
func New(params Params) (*Backend, error) {
retry := newRetry(params.Log)
Expand Down Expand Up @@ -157,14 +162,17 @@ func New(params Params) (*Backend, error) {
scriptExecMode: params.ScriptExecutionMode,
},
backendTransactions: backendTransactions{
TransactionsLocalDataProvider: TransactionsLocalDataProvider{
state: params.State,
collections: params.Collections,
blocks: params.Blocks,
eventsIndex: params.EventsIndex,
txResultsIndex: params.TxResultsIndex,
},
log: params.Log,
staticCollectionRPC: params.CollectionRPC,
state: params.State,
chainID: params.ChainID,
collections: params.Collections,
blocks: params.Blocks,
transactions: params.Transactions,
results: params.LightTransactionResults,
executionReceipts: params.ExecutionReceipts,
transactionValidator: configureTransactionValidator(params.State, params.ChainID),
transactionMetrics: params.AccessMetrics,
Expand All @@ -174,6 +182,7 @@ func New(params Params) (*Backend, error) {
nodeCommunicator: params.Communicator,
txResultCache: txResCache,
txErrorMessagesCache: txErrorMessagesCache,
txResultQueryMode: params.TxResultQueryMode,
},
backendEvents: backendEvents{
log: params.Log,
Expand Down Expand Up @@ -221,6 +230,8 @@ func New(params Params) (*Backend, error) {
nodeInfo: nodeInfo,
}

b.backendTransactions.txErrorMessages = b

retry.SetBackend(b)

preferredENIdentifiers, err = identifierList(params.PreferredExecutionNodeIDs)
Expand Down Expand Up @@ -267,7 +278,6 @@ func configureTransactionValidator(state protocol.State, chainID flow.ChainID) *

// Ping responds to requests when the server is up.
func (b *Backend) Ping(ctx context.Context) error {

// staticCollectionRPC is only set if a collection node address was provided at startup
if b.staticCollectionRPC != nil {
_, err := b.staticCollectionRPC.Ping(ctx, &accessproto.PingRequest{})
Expand Down Expand Up @@ -336,7 +346,6 @@ func executionNodesForBlockID(
state protocol.State,
log zerolog.Logger,
) (flow.IdentitySkeletonList, error) {

var (
executorIDs flow.IdentifierList
err error
Expand Down Expand Up @@ -377,7 +386,7 @@ func executionNodesForBlockID(
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(100 * time.Millisecond << time.Duration(attempt)):
//retry after an exponential backoff
// retry after an exponential backoff
}
}

Expand Down Expand Up @@ -412,7 +421,6 @@ func findAllExecutionNodes(
executionReceipts storage.ExecutionReceipts,
log zerolog.Logger,
) (flow.IdentifierList, error) {

// lookup the receipt's storage with the block ID
allReceipts, err := executionReceipts.ByBlockID(blockID)
if err != nil {
Expand Down Expand Up @@ -469,7 +477,6 @@ func findAllExecutionNodes(
// e.g. If execution nodes in identity table are {1,2,3,4}, preferred ENs are defined as {2,3,4}
// and the executor IDs is {1,2,3}, then {2, 3} is returned as the chosen subset of ENs
func chooseExecutionNodes(state protocol.State, executorIDs flow.IdentifierList) (flow.IdentitySkeletonList, error) {

allENs, err := state.Final().Identities(filter.HasRole[flow.Identity](flow.RoleExecution))
if err != nil {
return nil, fmt.Errorf("failed to retreive all execution IDs: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,5 +353,5 @@ func convertAccountError(err error, address flow.Address, height uint64) error {
return status.Errorf(codes.NotFound, "account not found")
}

return convertIndexError(err, height, "failed to get account")
return rpc.ConvertIndexError(err, height, "failed to get account")
}
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/backend_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (b *backendEvents) getBlockEventsFromStorage(
return nil, nil, rpc.ConvertError(ctx.Err(), "failed to get events from storage", codes.Canceled)
}

events, err := b.eventsIndex.GetEvents(blockInfo.ID, blockInfo.Height)
events, err := b.eventsIndex.ByBlockID(blockInfo.ID, blockInfo.Height)
if err != nil {
if errors.Is(err, storage.ErrNotFound) ||
errors.Is(err, storage.ErrHeightNotIndexed) ||
Expand Down
25 changes: 1 addition & 24 deletions engine/access/rpc/backend/backend_scripts.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package backend
import (
"context"
"crypto/md5" //nolint:gosec
"errors"
"time"

lru "github.com/hashicorp/golang-lru/v2"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/state_synchronization/indexer"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
Expand Down Expand Up @@ -354,26 +352,5 @@ func convertScriptExecutionError(err error, height uint64) error {
}
}

return convertIndexError(err, height, "failed to execute script")
}

// convertIndexError converts errors related to index to a gRPC error
func convertIndexError(err error, height uint64, defaultMsg string) error {
if err == nil {
return nil
}

if errors.Is(err, indexer.ErrIndexNotInitialized) {
return status.Errorf(codes.FailedPrecondition, "data for block is not available: %v", err)
}

if errors.Is(err, storage.ErrHeightNotIndexed) {
return status.Errorf(codes.OutOfRange, "data for block height %d is not available", height)
}

if errors.Is(err, storage.ErrNotFound) {
return status.Errorf(codes.NotFound, "data not found: %v", err)
}

return rpc.ConvertError(err, defaultMsg, codes.Internal)
return rpc.ConvertIndexError(err, height, "failed to execute script")
}
4 changes: 3 additions & 1 deletion engine/access/rpc/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type Suite struct {
receipts *storagemock.ExecutionReceipts
results *storagemock.ExecutionResults
transactionResults *storagemock.LightTransactionResults
events *storagemock.Events

colClient *access.AccessAPIClient
execClient *access.ExecutionAPIClient
Expand Down Expand Up @@ -100,6 +101,7 @@ func (suite *Suite) SetupTest() {
suite.colClient = new(access.AccessAPIClient)
suite.execClient = new(access.ExecutionAPIClient)
suite.transactionResults = storagemock.NewLightTransactionResults(suite.T())
suite.events = storagemock.NewEvents(suite.T())
suite.chainID = flow.Testnet
suite.historicalAccessClient = new(access.AccessAPIClient)
suite.connectionFactory = connectionmock.NewConnectionFactory(suite.T())
Expand Down Expand Up @@ -2138,7 +2140,6 @@ func (suite *Suite) defaultBackendParams() Params {
Transactions: suite.transactions,
ExecutionReceipts: suite.receipts,
ExecutionResults: suite.results,
LightTransactionResults: suite.transactionResults,
ChainID: suite.chainID,
CollectionRPC: suite.colClient,
MaxHeightRange: DefaultMaxHeightRange,
Expand All @@ -2147,5 +2148,6 @@ func (suite *Suite) defaultBackendParams() Params {
AccessMetrics: metrics.NewNoopCollector(),
Log: suite.log,
TxErrorMessagesCacheSize: 1000,
TxResultQueryMode: IndexQueryModeExecutionNodesOnly,
}
}
Loading
Loading