diff --git a/access/handler.go b/access/handler.go index 404bfa81318..11e47dd3521 100644 --- a/access/handler.go +++ b/access/handler.go @@ -3,17 +3,17 @@ package access import ( "context" - "github.com/onflow/flow/protobuf/go/flow/access" - "github.com/onflow/flow/protobuf/go/flow/entities" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/timestamppb" "github.com/onflow/flow-go/consensus/hotstuff" "github.com/onflow/flow-go/consensus/hotstuff/signature" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + + "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/entities" ) type Handler struct { @@ -516,7 +516,7 @@ func (h *Handler) GetEventsForHeightRange( return nil, err } - resultEvents, err := blockEventsToMessages(results) + resultEvents, err := convert.BlockEventsToMessages(results) if err != nil { return nil, err } @@ -548,7 +548,7 @@ func (h *Handler) GetEventsForBlockIDs( return nil, err } - resultEvents, err := blockEventsToMessages(results) + resultEvents, err := convert.BlockEventsToMessages(results) if err != nil { return nil, err } @@ -590,6 +590,27 @@ func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access. return executionResultToMessages(result, metadata) } +// GetExecutionResultByID returns the execution result for the given ID. +func (h *Handler) GetExecutionResultByID(ctx context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) { + metadata := h.buildMetadataResponse() + + blockID := convert.MessageToIdentifier(req.GetId()) + + result, err := h.api.GetExecutionResultByID(ctx, blockID) + if err != nil { + return nil, err + } + + execResult, err := convert.ExecutionResultToMessage(result) + if err != nil { + return nil, err + } + return &access.ExecutionResultByIDResponse{ + ExecutionResult: execResult, + Metadata: metadata, + }, nil +} + func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) { metadata := h.buildMetadataResponse() @@ -659,34 +680,6 @@ func executionResultToMessages(er *flow.ExecutionResult, metadata *entities.Meta }, nil } -func blockEventsToMessages(blocks []flow.BlockEvents) ([]*access.EventsResponse_Result, error) { - results := make([]*access.EventsResponse_Result, len(blocks)) - - for i, block := range blocks { - event, err := blockEventsToMessage(block) - if err != nil { - return nil, err - } - results[i] = event - } - - return results, nil -} - -func blockEventsToMessage(block flow.BlockEvents) (*access.EventsResponse_Result, error) { - eventMessages := make([]*entities.Event, len(block.Events)) - for i, event := range block.Events { - eventMessages[i] = convert.EventToMessage(event) - } - timestamp := timestamppb.New(block.BlockTimestamp) - return &access.EventsResponse_Result{ - BlockId: block.BlockID[:], - BlockHeight: block.BlockHeight, - BlockTimestamp: timestamp, - Events: eventMessages, - }, nil -} - // WithBlockSignerDecoder configures the Handler to decode signer indices // via the provided hotstuff.BlockSignerDecoder func WithBlockSignerDecoder(signerIndicesDecoder hotstuff.BlockSignerDecoder) func(*Handler) { diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index a39cf32f350..0607870cf53 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -37,7 +37,7 @@ import ( "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/engine/access/ingestion" pingeng "github.com/onflow/flow-go/engine/access/ping" - "github.com/onflow/flow-go/engine/access/rest" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/engine/access/rpc" "github.com/onflow/flow-go/engine/access/rpc/backend" "github.com/onflow/flow-go/engine/access/state_stream" @@ -147,20 +147,22 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { collectionGRPCPort: 9000, executionGRPCPort: 9000, rpcConf: rpc.Config{ - UnsecureGRPCListenAddr: "0.0.0.0:9000", - SecureGRPCListenAddr: "0.0.0.0:9001", - HTTPListenAddr: "0.0.0.0:8000", - RESTListenAddr: "", - CollectionAddr: "", - HistoricalAccessAddrs: "", - CollectionClientTimeout: 3 * time.Second, - ExecutionClientTimeout: 3 * time.Second, - ConnectionPoolSize: backend.DefaultConnectionPoolSize, - MaxHeightRange: backend.DefaultMaxHeightRange, - PreferredExecutionNodeIDs: nil, - FixedExecutionNodeIDs: nil, - ArchiveAddressList: nil, - MaxMsgSize: grpcutils.DefaultMaxMsgSize, + UnsecureGRPCListenAddr: "0.0.0.0:9000", + SecureGRPCListenAddr: "0.0.0.0:9001", + HTTPListenAddr: "0.0.0.0:8000", + RESTListenAddr: "", + CollectionAddr: "", + HistoricalAccessAddrs: "", + BackendConfig: backend.Config{ + CollectionClientTimeout: 3 * time.Second, + ExecutionClientTimeout: 3 * time.Second, + ConnectionPoolSize: backend.DefaultConnectionPoolSize, + MaxHeightRange: backend.DefaultMaxHeightRange, + PreferredExecutionNodeIDs: nil, + FixedExecutionNodeIDs: nil, + ArchiveAddressList: nil, + }, + MaxMsgSize: grpcutils.DefaultMaxMsgSize, }, stateStreamConf: state_stream.Config{ MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize, @@ -667,15 +669,15 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { flags.StringVar(&builder.rpcConf.RESTListenAddr, "rest-addr", defaultConfig.rpcConf.RESTListenAddr, "the address the REST server listens on (if empty the REST server will not be started)") flags.StringVarP(&builder.rpcConf.CollectionAddr, "static-collection-ingress-addr", "", defaultConfig.rpcConf.CollectionAddr, "the address (of the collection node) to send transactions to") flags.StringVarP(&builder.ExecutionNodeAddress, "script-addr", "s", defaultConfig.ExecutionNodeAddress, "the address (of the execution node) forward the script to") - flags.StringSliceVar(&builder.rpcConf.ArchiveAddressList, "archive-address-list", defaultConfig.rpcConf.ArchiveAddressList, "the list of address of the archive node to forward the script queries to") + flags.StringSliceVar(&builder.rpcConf.BackendConfig.ArchiveAddressList, "archive-address-list", defaultConfig.rpcConf.BackendConfig.ArchiveAddressList, "the list of address of the archive node to forward the script queries to") flags.StringVarP(&builder.rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", defaultConfig.rpcConf.HistoricalAccessAddrs, "comma separated rpc addresses for historical access nodes") - flags.DurationVar(&builder.rpcConf.CollectionClientTimeout, "collection-client-timeout", defaultConfig.rpcConf.CollectionClientTimeout, "grpc client timeout for a collection node") - flags.DurationVar(&builder.rpcConf.ExecutionClientTimeout, "execution-client-timeout", defaultConfig.rpcConf.ExecutionClientTimeout, "grpc client timeout for an execution node") - flags.UintVar(&builder.rpcConf.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size") + flags.DurationVar(&builder.rpcConf.BackendConfig.CollectionClientTimeout, "collection-client-timeout", defaultConfig.rpcConf.BackendConfig.CollectionClientTimeout, "grpc client timeout for a collection node") + flags.DurationVar(&builder.rpcConf.BackendConfig.ExecutionClientTimeout, "execution-client-timeout", defaultConfig.rpcConf.BackendConfig.ExecutionClientTimeout, "grpc client timeout for an execution node") + flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size") flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", grpcutils.DefaultMaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc") - flags.UintVar(&builder.rpcConf.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.MaxHeightRange, "maximum size for height range requests") - flags.StringSliceVar(&builder.rpcConf.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.") - flags.StringSliceVar(&builder.rpcConf.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.") + flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.BackendConfig.MaxHeightRange, "maximum size for height range requests") + flags.StringSliceVar(&builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.BackendConfig.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.") + flags.StringSliceVar(&builder.rpcConf.BackendConfig.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.BackendConfig.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.") flags.BoolVar(&builder.logTxTimeToFinalized, "log-tx-time-to-finalized", defaultConfig.logTxTimeToFinalized, "log transaction time to finalized") flags.BoolVar(&builder.logTxTimeToExecuted, "log-tx-time-to-executed", defaultConfig.logTxTimeToExecuted, "log transaction time to executed") flags.BoolVar(&builder.logTxTimeToFinalizedExecuted, "log-tx-time-to-finalized-executed", defaultConfig.logTxTimeToFinalizedExecuted, "log transaction time to finalized and executed") @@ -915,7 +917,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.rpcConf.CollectionAddr, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(builder.rpcConf.MaxMsgSize))), grpc.WithTransportCredentials(insecure.NewCredentials()), - backend.WithClientUnaryInterceptor(builder.rpcConf.CollectionClientTimeout)) + backend.WithClientUnaryInterceptor(builder.rpcConf.BackendConfig.CollectionClientTimeout)) if err != nil { return err } @@ -972,7 +974,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil }). Module("rest metrics", func(node *cmd.NodeConfig) error { - m, err := metrics.NewRestCollector(rest.URLToRoute, node.MetricsRegisterer) + m, err := metrics.NewRestCollector(routes.URLToRoute, node.MetricsRegisterer) if err != nil { return err } @@ -1034,10 +1036,31 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { return nil }). Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { - engineBuilder, err := rpc.NewBuilder( - node.Logger, + config := builder.rpcConf + backendConfig := config.BackendConfig + accessMetrics := builder.AccessMetrics + + backendCache, cacheSize, err := backend.NewCache(node.Logger, + accessMetrics, + backendConfig.ConnectionPoolSize) + if err != nil { + return nil, fmt.Errorf("could not initialize backend cache: %w", err) + } + + connFactory := &backend.ConnectionFactoryImpl{ + CollectionGRPCPort: builder.collectionGRPCPort, + ExecutionGRPCPort: builder.executionGRPCPort, + CollectionNodeGRPCTimeout: backendConfig.CollectionClientTimeout, + ExecutionNodeGRPCTimeout: backendConfig.ExecutionClientTimeout, + ConnectionsCache: backendCache, + CacheSize: cacheSize, + MaxMsgSize: config.MaxMsgSize, + AccessMetrics: accessMetrics, + Log: node.Logger, + } + + backend := backend.New( node.State, - builder.rpcConf, builder.CollectionRPC, builder.HistoricalAccessRPCs, node.Storage.Blocks, @@ -1048,11 +1071,25 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { node.Storage.Results, node.RootChainID, builder.AccessMetrics, - builder.collectionGRPCPort, - builder.executionGRPCPort, + connFactory, builder.retryEnabled, + backendConfig.MaxHeightRange, + backendConfig.PreferredExecutionNodeIDs, + backendConfig.FixedExecutionNodeIDs, + node.Logger, + backend.DefaultSnapshotHistoryLimit, + backendConfig.ArchiveAddressList) + + engineBuilder, err := rpc.NewBuilder( + node.Logger, + node.State, + config, + node.RootChainID, + builder.AccessMetrics, builder.rpcMetricsEnabled, builder.Me, + backend, + backend, builder.secureGrpcServer, builder.unsecureGrpcServer, ) diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 36343507a8f..8fe2a0f8d91 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -28,6 +28,8 @@ import ( recovery "github.com/onflow/flow-go/consensus/recovery/protocol" "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/engine/access/apiproxy" + restapiproxy "github.com/onflow/flow-go/engine/access/rest/apiproxy" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/engine/access/rpc" "github.com/onflow/flow-go/engine/access/rpc/backend" "github.com/onflow/flow-go/engine/common/follower" @@ -110,19 +112,22 @@ type ObserverServiceConfig struct { func DefaultObserverServiceConfig() *ObserverServiceConfig { return &ObserverServiceConfig{ rpcConf: rpc.Config{ - UnsecureGRPCListenAddr: "0.0.0.0:9000", - SecureGRPCListenAddr: "0.0.0.0:9001", - HTTPListenAddr: "0.0.0.0:8000", - RESTListenAddr: "", - CollectionAddr: "", - HistoricalAccessAddrs: "", - CollectionClientTimeout: 3 * time.Second, - ExecutionClientTimeout: 3 * time.Second, - MaxHeightRange: backend.DefaultMaxHeightRange, - PreferredExecutionNodeIDs: nil, - FixedExecutionNodeIDs: nil, - ArchiveAddressList: nil, - MaxMsgSize: grpcutils.DefaultMaxMsgSize, + UnsecureGRPCListenAddr: "0.0.0.0:9000", + SecureGRPCListenAddr: "0.0.0.0:9001", + HTTPListenAddr: "0.0.0.0:8000", + RESTListenAddr: "", + CollectionAddr: "", + HistoricalAccessAddrs: "", + BackendConfig: backend.Config{ + CollectionClientTimeout: 3 * time.Second, + ExecutionClientTimeout: 3 * time.Second, + ConnectionPoolSize: backend.DefaultConnectionPoolSize, + MaxHeightRange: backend.DefaultMaxHeightRange, + PreferredExecutionNodeIDs: nil, + FixedExecutionNodeIDs: nil, + ArchiveAddressList: nil, + }, + MaxMsgSize: grpcutils.DefaultMaxMsgSize, }, rpcMetricsEnabled: false, apiRatelimits: nil, @@ -164,6 +169,8 @@ type ObserverServiceBuilder struct { // Public network peerID peer.ID + RestMetrics *metrics.RestCollector + AccessMetrics module.AccessMetrics // grpc servers secureGrpcServer *grpcserver.GrpcServer unsecureGrpcServer *grpcserver.GrpcServer @@ -451,7 +458,8 @@ func (builder *ObserverServiceBuilder) extraFlags() { flags.StringVarP(&builder.rpcConf.HTTPListenAddr, "http-addr", "h", defaultConfig.rpcConf.HTTPListenAddr, "the address the http proxy server listens on") flags.StringVar(&builder.rpcConf.RESTListenAddr, "rest-addr", defaultConfig.rpcConf.RESTListenAddr, "the address the REST server listens on (if empty the REST server will not be started)") flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", defaultConfig.rpcConf.MaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc") - flags.UintVar(&builder.rpcConf.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.MaxHeightRange, "maximum size for height range requests") + flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size") + flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.BackendConfig.MaxHeightRange, "maximum size for height range requests") flags.StringToIntVar(&builder.apiRatelimits, "api-rate-limits", defaultConfig.apiRatelimits, "per second rate limits for Access API methods e.g. Ping=300,GetTransaction=500 etc.") flags.StringToIntVar(&builder.apiBurstlimits, "api-burst-limits", defaultConfig.apiBurstlimits, "burst limits for Access API methods e.g. Ping=100,GetTransaction=100 etc.") flags.StringVar(&builder.observerNetworkingKeyPath, "observer-networking-key-path", defaultConfig.observerNetworkingKeyPath, "path to the networking key for observer") @@ -867,11 +875,46 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { return nil }) + builder.Module("rest metrics", func(node *cmd.NodeConfig) error { + m, err := metrics.NewRestCollector(routes.URLToRoute, node.MetricsRegisterer) + if err != nil { + return err + } + builder.RestMetrics = m + return nil + }) + builder.Module("access metrics", func(node *cmd.NodeConfig) error { + builder.AccessMetrics = metrics.NewAccessCollector( + metrics.WithRestMetrics(builder.RestMetrics), + ) + return nil + }) builder.Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { - engineBuilder, err := rpc.NewBuilder( - node.Logger, + accessMetrics := builder.AccessMetrics + config := builder.rpcConf + backendConfig := config.BackendConfig + + backendCache, cacheSize, err := backend.NewCache(node.Logger, + accessMetrics, + config.BackendConfig.ConnectionPoolSize) + if err != nil { + return nil, fmt.Errorf("could not initialize backend cache: %w", err) + } + + connFactory := &backend.ConnectionFactoryImpl{ + CollectionGRPCPort: 0, + ExecutionGRPCPort: 0, + CollectionNodeGRPCTimeout: backendConfig.CollectionClientTimeout, + ExecutionNodeGRPCTimeout: backendConfig.ExecutionClientTimeout, + ConnectionsCache: backendCache, + CacheSize: cacheSize, + MaxMsgSize: config.MaxMsgSize, + AccessMetrics: accessMetrics, + Log: node.Logger, + } + + accessBackend := backend.New( node.State, - builder.rpcConf, nil, nil, node.Storage.Blocks, @@ -881,12 +924,39 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { node.Storage.Receipts, node.Storage.Results, node.RootChainID, - metrics.NewNoopCollector(), - 0, - 0, + accessMetrics, + connFactory, false, + backendConfig.MaxHeightRange, + backendConfig.PreferredExecutionNodeIDs, + backendConfig.FixedExecutionNodeIDs, + node.Logger, + backend.DefaultSnapshotHistoryLimit, + backendConfig.ArchiveAddressList) + + observerCollector := metrics.NewObserverCollector() + restHandler, err := restapiproxy.NewRestProxyHandler( + accessBackend, + builder.upstreamIdentities, + builder.apiTimeout, + config.MaxMsgSize, + builder.Logger, + observerCollector, + node.RootChainID.Chain()) + if err != nil { + return nil, err + } + + engineBuilder, err := rpc.NewBuilder( + node.Logger, + node.State, + config, + node.RootChainID, + accessMetrics, builder.rpcMetricsEnabled, builder.Me, + accessBackend, + restHandler, builder.secureGrpcServer, builder.unsecureGrpcServer, ) @@ -895,14 +965,14 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { } // upstream access node forwarder - forwarder, err := apiproxy.NewFlowAccessAPIForwarder(builder.upstreamIdentities, builder.apiTimeout, builder.rpcConf.MaxMsgSize) + forwarder, err := apiproxy.NewFlowAccessAPIForwarder(builder.upstreamIdentities, builder.apiTimeout, config.MaxMsgSize) if err != nil { return nil, err } - proxy := &apiproxy.FlowAccessAPIRouter{ + rpcHandler := &apiproxy.FlowAccessAPIRouter{ Logger: builder.Logger, - Metrics: metrics.NewObserverCollector(), + Metrics: observerCollector, Upstream: forwarder, Observer: protocol.NewHandler(protocol.New( node.State, @@ -914,7 +984,7 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { // build the rpc engine builder.RpcEng, err = engineBuilder. - WithNewHandler(proxy). + WithRpcHandler(rpcHandler). WithLegacy(). Build() if err != nil { diff --git a/engine/access/apiproxy/access_api_proxy.go b/engine/access/apiproxy/access_api_proxy.go index d72ec5bb5e2..f5898686fc6 100644 --- a/engine/access/apiproxy/access_api_proxy.go +++ b/engine/access/apiproxy/access_api_proxy.go @@ -2,26 +2,17 @@ package apiproxy import ( "context" - "fmt" - "sync" "time" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" - - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" - "github.com/onflow/flow-go/engine/access/rpc/backend" + "github.com/onflow/flow-go/engine/common/grpc/forwarder" "github.com/onflow/flow-go/engine/protocol" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" - "github.com/onflow/flow-go/utils/grpcutils" ) // FlowAccessAPIRouter is a structure that represents the routing proxy algorithm. @@ -51,88 +42,6 @@ func (h *FlowAccessAPIRouter) log(handler, rpc string, err error) { logger.Info().Msg("request succeeded") } -// reconnectingClient returns an active client, or -// creates one, if the last one is not ready anymore. -func (h *FlowAccessAPIForwarder) reconnectingClient(i int) error { - timeout := h.timeout - - if h.connections[i] == nil || h.connections[i].GetState() != connectivity.Ready { - identity := h.ids[i] - var connection *grpc.ClientConn - var err error - if identity.NetworkPubKey == nil { - connection, err = grpc.Dial( - identity.Address, - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(h.maxMsgSize))), - grpc.WithTransportCredentials(insecure.NewCredentials()), - backend.WithClientUnaryInterceptor(timeout)) - if err != nil { - return err - } - } else { - tlsConfig, err := grpcutils.DefaultClientTLSConfig(identity.NetworkPubKey) - if err != nil { - return fmt.Errorf("failed to get default TLS client config using public flow networking key %s %w", identity.NetworkPubKey.String(), err) - } - - connection, err = grpc.Dial( - identity.Address, - grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(h.maxMsgSize))), - grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), - backend.WithClientUnaryInterceptor(timeout)) - if err != nil { - return fmt.Errorf("cannot connect to %s %w", identity.Address, err) - } - } - connection.Connect() - time.Sleep(1 * time.Second) - state := connection.GetState() - if state != connectivity.Ready && state != connectivity.Connecting { - return fmt.Errorf("%v", state) - } - h.connections[i] = connection - h.upstream[i] = access.NewAccessAPIClient(connection) - } - - return nil -} - -// faultTolerantClient implements an upstream connection that reconnects on errors -// a reasonable amount of time. -func (h *FlowAccessAPIForwarder) faultTolerantClient() (access.AccessAPIClient, error) { - if h.upstream == nil || len(h.upstream) == 0 { - return nil, status.Errorf(codes.Unimplemented, "method not implemented") - } - - // Reasoning: A retry count of three gives an acceptable 5% failure ratio from a 37% failure ratio. - // A bigger number is problematic due to the DNS resolve and connection times, - // plus the need to log and debug each individual connection failure. - // - // This reasoning eliminates the need of making this parameter configurable. - // The logic works rolling over a single connection as well making clean code. - const retryMax = 3 - - h.lock.Lock() - defer h.lock.Unlock() - - var err error - for i := 0; i < retryMax; i++ { - h.roundRobin++ - h.roundRobin = h.roundRobin % len(h.upstream) - err = h.reconnectingClient(h.roundRobin) - if err != nil { - continue - } - state := h.connections[h.roundRobin].GetState() - if state != connectivity.Ready && state != connectivity.Connecting { - continue - } - return h.upstream[h.roundRobin], nil - } - - return nil, status.Errorf(codes.Unavailable, err.Error()) -} - // Ping pings the service. It is special in the sense that it responds successful, // only if all underlying services are ready. func (h *FlowAccessAPIRouter) Ping(context context.Context, req *access.PingRequest) (*access.PingResponse, error) { @@ -290,54 +199,33 @@ func (h *FlowAccessAPIRouter) GetExecutionResultForBlockID(context context.Conte return res, err } +func (h *FlowAccessAPIRouter) GetExecutionResultByID(context context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) { + res, err := h.Upstream.GetExecutionResultByID(context, req) + h.log("upstream", "GetExecutionResultByID", err) + return res, err +} + // FlowAccessAPIForwarder forwards all requests to a set of upstream access nodes or observers type FlowAccessAPIForwarder struct { - lock sync.Mutex - roundRobin int - ids flow.IdentityList - upstream []access.AccessAPIClient - connections []*grpc.ClientConn - timeout time.Duration - maxMsgSize uint + *forwarder.Forwarder } func NewFlowAccessAPIForwarder(identities flow.IdentityList, timeout time.Duration, maxMsgSize uint) (*FlowAccessAPIForwarder, error) { - forwarder := &FlowAccessAPIForwarder{maxMsgSize: maxMsgSize} - err := forwarder.setFlowAccessAPI(identities, timeout) - return forwarder, err -} - -// setFlowAccessAPI sets a backend access API that forwards some requests to an upstream node. -// It is used by Observer services, Blockchain Data Service, etc. -// Make sure that this is just for observation and not a staked participant in the flow network. -// This means that observers see a copy of the data but there is no interaction to ensure integrity from the root block. -func (ret *FlowAccessAPIForwarder) setFlowAccessAPI(accessNodeAddressAndPort flow.IdentityList, timeout time.Duration) error { - ret.timeout = timeout - ret.ids = accessNodeAddressAndPort - ret.upstream = make([]access.AccessAPIClient, accessNodeAddressAndPort.Count()) - ret.connections = make([]*grpc.ClientConn, accessNodeAddressAndPort.Count()) - for i, identity := range accessNodeAddressAndPort { - // Store the faultTolerantClient setup parameters such as address, public, key and timeout, so that - // we can refresh the API on connection loss - ret.ids[i] = identity - - // We fail on any single error on startup, so that - // we identify bootstrapping errors early - err := ret.reconnectingClient(i) - if err != nil { - return err - } + forwarder, err := forwarder.NewForwarder(identities, timeout, maxMsgSize) + if err != nil { + return nil, err } - ret.roundRobin = 0 - return nil + return &FlowAccessAPIForwarder{ + Forwarder: forwarder, + }, nil } // Ping pings the service. It is special in the sense that it responds successful, // only if all underlying services are ready. func (h *FlowAccessAPIForwarder) Ping(context context.Context, req *access.PingRequest) (*access.PingResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -346,7 +234,7 @@ func (h *FlowAccessAPIForwarder) Ping(context context.Context, req *access.PingR func (h *FlowAccessAPIForwarder) GetNodeVersionInfo(context context.Context, req *access.GetNodeVersionInfoRequest) (*access.GetNodeVersionInfoResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -355,7 +243,7 @@ func (h *FlowAccessAPIForwarder) GetNodeVersionInfo(context context.Context, req func (h *FlowAccessAPIForwarder) GetLatestBlockHeader(context context.Context, req *access.GetLatestBlockHeaderRequest) (*access.BlockHeaderResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -364,7 +252,7 @@ func (h *FlowAccessAPIForwarder) GetLatestBlockHeader(context context.Context, r func (h *FlowAccessAPIForwarder) GetBlockHeaderByID(context context.Context, req *access.GetBlockHeaderByIDRequest) (*access.BlockHeaderResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -373,7 +261,7 @@ func (h *FlowAccessAPIForwarder) GetBlockHeaderByID(context context.Context, req func (h *FlowAccessAPIForwarder) GetBlockHeaderByHeight(context context.Context, req *access.GetBlockHeaderByHeightRequest) (*access.BlockHeaderResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -382,7 +270,7 @@ func (h *FlowAccessAPIForwarder) GetBlockHeaderByHeight(context context.Context, func (h *FlowAccessAPIForwarder) GetLatestBlock(context context.Context, req *access.GetLatestBlockRequest) (*access.BlockResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -391,7 +279,7 @@ func (h *FlowAccessAPIForwarder) GetLatestBlock(context context.Context, req *ac func (h *FlowAccessAPIForwarder) GetBlockByID(context context.Context, req *access.GetBlockByIDRequest) (*access.BlockResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -400,7 +288,7 @@ func (h *FlowAccessAPIForwarder) GetBlockByID(context context.Context, req *acce func (h *FlowAccessAPIForwarder) GetBlockByHeight(context context.Context, req *access.GetBlockByHeightRequest) (*access.BlockResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -409,7 +297,7 @@ func (h *FlowAccessAPIForwarder) GetBlockByHeight(context context.Context, req * func (h *FlowAccessAPIForwarder) GetCollectionByID(context context.Context, req *access.GetCollectionByIDRequest) (*access.CollectionResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -418,7 +306,7 @@ func (h *FlowAccessAPIForwarder) GetCollectionByID(context context.Context, req func (h *FlowAccessAPIForwarder) SendTransaction(context context.Context, req *access.SendTransactionRequest) (*access.SendTransactionResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -427,7 +315,7 @@ func (h *FlowAccessAPIForwarder) SendTransaction(context context.Context, req *a func (h *FlowAccessAPIForwarder) GetTransaction(context context.Context, req *access.GetTransactionRequest) (*access.TransactionResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -436,7 +324,7 @@ func (h *FlowAccessAPIForwarder) GetTransaction(context context.Context, req *ac func (h *FlowAccessAPIForwarder) GetTransactionResult(context context.Context, req *access.GetTransactionRequest) (*access.TransactionResultResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -445,7 +333,7 @@ func (h *FlowAccessAPIForwarder) GetTransactionResult(context context.Context, r func (h *FlowAccessAPIForwarder) GetTransactionResultByIndex(context context.Context, req *access.GetTransactionByIndexRequest) (*access.TransactionResultResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -454,7 +342,7 @@ func (h *FlowAccessAPIForwarder) GetTransactionResultByIndex(context context.Con func (h *FlowAccessAPIForwarder) GetTransactionResultsByBlockID(context context.Context, req *access.GetTransactionsByBlockIDRequest) (*access.TransactionResultsResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -462,7 +350,7 @@ func (h *FlowAccessAPIForwarder) GetTransactionResultsByBlockID(context context. } func (h *FlowAccessAPIForwarder) GetTransactionsByBlockID(context context.Context, req *access.GetTransactionsByBlockIDRequest) (*access.TransactionsResponse, error) { - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -471,7 +359,7 @@ func (h *FlowAccessAPIForwarder) GetTransactionsByBlockID(context context.Contex func (h *FlowAccessAPIForwarder) GetAccount(context context.Context, req *access.GetAccountRequest) (*access.GetAccountResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -480,7 +368,7 @@ func (h *FlowAccessAPIForwarder) GetAccount(context context.Context, req *access func (h *FlowAccessAPIForwarder) GetAccountAtLatestBlock(context context.Context, req *access.GetAccountAtLatestBlockRequest) (*access.AccountResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -489,7 +377,7 @@ func (h *FlowAccessAPIForwarder) GetAccountAtLatestBlock(context context.Context func (h *FlowAccessAPIForwarder) GetAccountAtBlockHeight(context context.Context, req *access.GetAccountAtBlockHeightRequest) (*access.AccountResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -498,7 +386,7 @@ func (h *FlowAccessAPIForwarder) GetAccountAtBlockHeight(context context.Context func (h *FlowAccessAPIForwarder) ExecuteScriptAtLatestBlock(context context.Context, req *access.ExecuteScriptAtLatestBlockRequest) (*access.ExecuteScriptResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -507,7 +395,7 @@ func (h *FlowAccessAPIForwarder) ExecuteScriptAtLatestBlock(context context.Cont func (h *FlowAccessAPIForwarder) ExecuteScriptAtBlockID(context context.Context, req *access.ExecuteScriptAtBlockIDRequest) (*access.ExecuteScriptResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -516,7 +404,7 @@ func (h *FlowAccessAPIForwarder) ExecuteScriptAtBlockID(context context.Context, func (h *FlowAccessAPIForwarder) ExecuteScriptAtBlockHeight(context context.Context, req *access.ExecuteScriptAtBlockHeightRequest) (*access.ExecuteScriptResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -525,7 +413,7 @@ func (h *FlowAccessAPIForwarder) ExecuteScriptAtBlockHeight(context context.Cont func (h *FlowAccessAPIForwarder) GetEventsForHeightRange(context context.Context, req *access.GetEventsForHeightRangeRequest) (*access.EventsResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -534,7 +422,7 @@ func (h *FlowAccessAPIForwarder) GetEventsForHeightRange(context context.Context func (h *FlowAccessAPIForwarder) GetEventsForBlockIDs(context context.Context, req *access.GetEventsForBlockIDsRequest) (*access.EventsResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -543,7 +431,7 @@ func (h *FlowAccessAPIForwarder) GetEventsForBlockIDs(context context.Context, r func (h *FlowAccessAPIForwarder) GetNetworkParameters(context context.Context, req *access.GetNetworkParametersRequest) (*access.GetNetworkParametersResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -552,7 +440,7 @@ func (h *FlowAccessAPIForwarder) GetNetworkParameters(context context.Context, r func (h *FlowAccessAPIForwarder) GetLatestProtocolStateSnapshot(context context.Context, req *access.GetLatestProtocolStateSnapshotRequest) (*access.ProtocolStateSnapshotResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } @@ -561,9 +449,18 @@ func (h *FlowAccessAPIForwarder) GetLatestProtocolStateSnapshot(context context. func (h *FlowAccessAPIForwarder) GetExecutionResultForBlockID(context context.Context, req *access.GetExecutionResultForBlockIDRequest) (*access.ExecutionResultForBlockIDResponse, error) { // This is a passthrough request - upstream, err := h.faultTolerantClient() + upstream, err := h.FaultTolerantClient() if err != nil { return nil, err } return upstream.GetExecutionResultForBlockID(context, req) } + +func (h *FlowAccessAPIForwarder) GetExecutionResultByID(context context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) { + // This is a passthrough request + upstream, err := h.FaultTolerantClient() + if err != nil { + return nil, err + } + return upstream.GetExecutionResultByID(context, req) +} diff --git a/engine/access/apiproxy/access_api_proxy_test.go b/engine/access/apiproxy/access_api_proxy_test.go index 9f5a5aa74b8..d20c5ee705d 100644 --- a/engine/access/apiproxy/access_api_proxy_test.go +++ b/engine/access/apiproxy/access_api_proxy_test.go @@ -11,6 +11,7 @@ import ( "google.golang.org/grpc" grpcinsecure "google.golang.org/grpc/credentials/insecure" + "github.com/onflow/flow-go/engine/common/grpc/forwarder" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/grpcutils" "github.com/onflow/flow-go/utils/unittest" @@ -137,7 +138,8 @@ func TestNewFlowCachedAccessAPIProxy(t *testing.T) { // Prepare a proxy that fails due to the second connection being idle l := flow.IdentityList{{Address: unittest.IPPort("11634")}, {Address: unittest.IPPort("11635")}} c := FlowAccessAPIForwarder{} - err = c.setFlowAccessAPI(l, time.Second) + c.Forwarder, err = forwarder.NewForwarder(l, time.Second, grpcutils.DefaultMaxMsgSize) + if err == nil { t.Fatal(fmt.Errorf("should not start with one connection ready")) } @@ -153,7 +155,7 @@ func TestNewFlowCachedAccessAPIProxy(t *testing.T) { // Prepare a proxy l = flow.IdentityList{{Address: unittest.IPPort("11634")}, {Address: unittest.IPPort("11635")}} c = FlowAccessAPIForwarder{} - err = c.setFlowAccessAPI(l, time.Second) + c.Forwarder, err = forwarder.NewForwarder(l, time.Second, grpcutils.DefaultMaxMsgSize) if err != nil { t.Fatal(err) } diff --git a/engine/access/integration_unsecure_grpc_server_test.go b/engine/access/integration_unsecure_grpc_server_test.go index e5fa9dbe4c9..af658b390fd 100644 --- a/engine/access/integration_unsecure_grpc_server_test.go +++ b/engine/access/integration_unsecure_grpc_server_test.go @@ -2,6 +2,7 @@ package access import ( "context" + "io" "os" "testing" @@ -19,6 +20,7 @@ import ( "github.com/onflow/flow-go/engine" accessmock "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/engine/access/rpc" + "github.com/onflow/flow-go/engine/access/rpc/backend" "github.com/onflow/flow-go/engine/access/state_stream" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/blobs" @@ -167,11 +169,8 @@ func (suite *SameGRPCPortTestSuite) SetupTest() { block := unittest.BlockHeaderFixture() suite.snapshot.On("Head").Return(block, nil) - // create rpc engine builder - rpcEngBuilder, err := rpc.NewBuilder( - suite.log, + backend := backend.New( suite.state, - config, suite.collClient, nil, suite.blocks, @@ -182,11 +181,26 @@ func (suite *SameGRPCPortTestSuite) SetupTest() { nil, suite.chainID, suite.metrics, + nil, + false, 0, + nil, + nil, + suite.log, 0, - false, + nil) + + // create rpc engine builder + rpcEngBuilder, err := rpc.NewBuilder( + suite.log, + suite.state, + config, + suite.chainID, + suite.metrics, false, suite.me, + backend, + backend, suite.secureGrpcServer, suite.unsecureGrpcServer, ) diff --git a/engine/access/mock/access_api_client.go b/engine/access/mock/access_api_client.go index 234e4ffcdee..4e2b1d065c7 100644 --- a/engine/access/mock/access_api_client.go +++ b/engine/access/mock/access_api_client.go @@ -446,6 +446,39 @@ func (_m *AccessAPIClient) GetEventsForHeightRange(ctx context.Context, in *acce return r0, r1 } +// GetExecutionResultByID provides a mock function with given fields: ctx, in, opts +func (_m *AccessAPIClient) GetExecutionResultByID(ctx context.Context, in *access.GetExecutionResultByIDRequest, opts ...grpc.CallOption) (*access.ExecutionResultByIDResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *access.ExecutionResultByIDResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *access.GetExecutionResultByIDRequest, ...grpc.CallOption) (*access.ExecutionResultByIDResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *access.GetExecutionResultByIDRequest, ...grpc.CallOption) *access.ExecutionResultByIDResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*access.ExecutionResultByIDResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *access.GetExecutionResultByIDRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetExecutionResultForBlockID provides a mock function with given fields: ctx, in, opts func (_m *AccessAPIClient) GetExecutionResultForBlockID(ctx context.Context, in *access.GetExecutionResultForBlockIDRequest, opts ...grpc.CallOption) (*access.ExecutionResultForBlockIDResponse, error) { _va := make([]interface{}, len(opts)) diff --git a/engine/access/mock/access_api_server.go b/engine/access/mock/access_api_server.go index 5515698eacd..1a2c3772e44 100644 --- a/engine/access/mock/access_api_server.go +++ b/engine/access/mock/access_api_server.go @@ -353,6 +353,32 @@ func (_m *AccessAPIServer) GetEventsForHeightRange(_a0 context.Context, _a1 *acc return r0, r1 } +// GetExecutionResultByID provides a mock function with given fields: _a0, _a1 +func (_m *AccessAPIServer) GetExecutionResultByID(_a0 context.Context, _a1 *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *access.ExecutionResultByIDResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *access.GetExecutionResultByIDRequest) *access.ExecutionResultByIDResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*access.ExecutionResultByIDResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *access.GetExecutionResultByIDRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetExecutionResultForBlockID provides a mock function with given fields: _a0, _a1 func (_m *AccessAPIServer) GetExecutionResultForBlockID(_a0 context.Context, _a1 *access.GetExecutionResultForBlockIDRequest) (*access.ExecutionResultForBlockIDResponse, error) { ret := _m.Called(_a0, _a1) diff --git a/engine/access/rest/README.md b/engine/access/rest/README.md index fd7b970493d..d94af68c238 100644 --- a/engine/access/rest/README.md +++ b/engine/access/rest/README.md @@ -6,10 +6,14 @@ available on our [docs site](https://docs.onflow.org/http-api/). ## Packages -- `rest`: The HTTP handlers for all the request, server generator and the select filter. +- `rest`: The HTTP handlers for the server generator and the select filter, implementation of handling local requests. - `middleware`: The common [middlewares](https://github.com/gorilla/mux#middleware) that all request pass through. - `models`: The generated models using openapi generators and implementation of model builders. - `request`: Implementation of API requests that provide validation for input data and build request models. +- `routes`: The common HTTP handlers for all the requests, tests for each request. +- `apiproxy`: Implementation of proxy backend handler which includes the local backend and forwards the methods which +can't be handled locally to an upstream using gRPC API. This is used by observers that don't have all data in their +local db. ## Request lifecycle @@ -37,7 +41,7 @@ make generate-openapi ### Adding New API Endpoints -A new endpoint can be added by first implementing a new request handler, a request handle is a function in the rest +A new endpoint can be added by first implementing a new request handler, a request handle is a function in the routes package that complies with function interfaced defined as: ```go @@ -48,6 +52,7 @@ generator models.LinkGenerator, ) (interface{}, error) ``` -That handler implementation needs to be added to the `router.go` with corresponding API endpoint and method. Adding a -new API endpoint also requires for a new request builder to be implemented and added in request package. Make sure to -not forget about adding tests for each of the API handler. +That handler implementation needs to be added to the `router.go` with corresponding API endpoint and method. If the data +is not available on observers, an override the method is needed in the backend handler `RestProxyHandler` for request +forwarding. Adding a new API endpoint also requires for a new request builder to be implemented and added in request +package. Make sure to not forget about adding tests for each of the API handler. diff --git a/engine/access/rest/apiproxy/rest_proxy_handler.go b/engine/access/rest/apiproxy/rest_proxy_handler.go new file mode 100644 index 00000000000..01e7b56724d --- /dev/null +++ b/engine/access/rest/apiproxy/rest_proxy_handler.go @@ -0,0 +1,344 @@ +package apiproxy + +import ( + "context" + "fmt" + "time" + + "google.golang.org/grpc/status" + + "github.com/rs/zerolog" + + "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/common/grpc/forwarder" + "github.com/onflow/flow-go/engine/common/rpc/convert" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/metrics" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" +) + +// RestProxyHandler is a structure that represents the proxy algorithm for observer node. +// It includes the local backend and forwards the methods which can't be handled locally to an upstream using gRPC API. +type RestProxyHandler struct { + access.API + *forwarder.Forwarder + Logger zerolog.Logger + Metrics metrics.ObserverMetrics + Chain flow.Chain +} + +// NewRestProxyHandler returns a new rest proxy handler for observer node. +func NewRestProxyHandler( + api access.API, + identities flow.IdentityList, + timeout time.Duration, + maxMsgSize uint, + log zerolog.Logger, + metrics metrics.ObserverMetrics, + chain flow.Chain, +) (*RestProxyHandler, error) { + + forwarder, err := forwarder.NewForwarder( + identities, + timeout, + maxMsgSize) + if err != nil { + return nil, fmt.Errorf("could not create REST forwarder: %w", err) + } + + restProxyHandler := &RestProxyHandler{ + Logger: log, + Metrics: metrics, + Chain: chain, + } + + restProxyHandler.API = api + restProxyHandler.Forwarder = forwarder + + return restProxyHandler, nil +} + +func (r *RestProxyHandler) log(handler, rpc string, err error) { + code := status.Code(err) + r.Metrics.RecordRPC(handler, rpc, code) + + logger := r.Logger.With(). + Str("handler", handler). + Str("rest_method", rpc). + Str("rest_code", code.String()). + Logger() + + if err != nil { + logger.Error().Err(err).Msg("request failed") + return + } + + logger.Info().Msg("request succeeded") +} + +// GetCollectionByID returns a collection by ID. +func (r *RestProxyHandler) GetCollectionByID(ctx context.Context, id flow.Identifier) (*flow.LightCollection, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + getCollectionByIDRequest := &accessproto.GetCollectionByIDRequest{ + Id: id[:], + } + + collectionResponse, err := upstream.GetCollectionByID(ctx, getCollectionByIDRequest) + r.log("upstream", "GetCollectionByID", err) + + if err != nil { + return nil, err + } + + transactions, err := convert.MessageToLightCollection(collectionResponse.Collection) + if err != nil { + return nil, err + } + + return transactions, nil +} + +// SendTransaction sends already created transaction. +func (r *RestProxyHandler) SendTransaction(ctx context.Context, tx *flow.TransactionBody) error { + upstream, err := r.FaultTolerantClient() + if err != nil { + return err + } + + transaction := convert.TransactionToMessage(*tx) + sendTransactionRequest := &accessproto.SendTransactionRequest{ + Transaction: transaction, + } + + _, err = upstream.SendTransaction(ctx, sendTransactionRequest) + r.log("upstream", "SendTransaction", err) + + return err +} + +// GetTransaction returns transaction by ID. +func (r *RestProxyHandler) GetTransaction(ctx context.Context, id flow.Identifier) (*flow.TransactionBody, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + getTransactionRequest := &accessproto.GetTransactionRequest{ + Id: id[:], + } + transactionResponse, err := upstream.GetTransaction(ctx, getTransactionRequest) + r.log("upstream", "GetTransaction", err) + + if err != nil { + return nil, err + } + + transactionBody, err := convert.MessageToTransaction(transactionResponse.Transaction, r.Chain) + if err != nil { + return nil, err + } + + return &transactionBody, nil +} + +// GetTransactionResult returns transaction result by the transaction ID. +func (r *RestProxyHandler) GetTransactionResult(ctx context.Context, id flow.Identifier, blockID flow.Identifier, collectionID flow.Identifier) (*access.TransactionResult, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + + return nil, err + } + + getTransactionResultRequest := &accessproto.GetTransactionRequest{ + Id: id[:], + BlockId: blockID[:], + CollectionId: collectionID[:], + } + + transactionResultResponse, err := upstream.GetTransactionResult(ctx, getTransactionResultRequest) + r.log("upstream", "GetTransactionResult", err) + + if err != nil { + return nil, err + } + + return access.MessageToTransactionResult(transactionResultResponse), nil +} + +// GetAccountAtBlockHeight returns account by account address and block height. +func (r *RestProxyHandler) GetAccountAtBlockHeight(ctx context.Context, address flow.Address, height uint64) (*flow.Account, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + getAccountAtBlockHeightRequest := &accessproto.GetAccountAtBlockHeightRequest{ + Address: address.Bytes(), + BlockHeight: height, + } + + accountResponse, err := upstream.GetAccountAtBlockHeight(ctx, getAccountAtBlockHeightRequest) + r.log("upstream", "GetAccountAtBlockHeight", err) + + if err != nil { + return nil, err + } + + return convert.MessageToAccount(accountResponse.Account) +} + +// ExecuteScriptAtLatestBlock executes script at latest block. +func (r *RestProxyHandler) ExecuteScriptAtLatestBlock(ctx context.Context, script []byte, arguments [][]byte) ([]byte, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + executeScriptAtLatestBlockRequest := &accessproto.ExecuteScriptAtLatestBlockRequest{ + Script: script, + Arguments: arguments, + } + executeScriptAtLatestBlockResponse, err := upstream.ExecuteScriptAtLatestBlock(ctx, executeScriptAtLatestBlockRequest) + r.log("upstream", "ExecuteScriptAtLatestBlock", err) + + if err != nil { + return nil, err + } + + return executeScriptAtLatestBlockResponse.Value, nil +} + +// ExecuteScriptAtBlockHeight executes script at the given block height . +func (r *RestProxyHandler) ExecuteScriptAtBlockHeight(ctx context.Context, blockHeight uint64, script []byte, arguments [][]byte) ([]byte, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + executeScriptAtBlockHeightRequest := &accessproto.ExecuteScriptAtBlockHeightRequest{ + BlockHeight: blockHeight, + Script: script, + Arguments: arguments, + } + executeScriptAtBlockHeightResponse, err := upstream.ExecuteScriptAtBlockHeight(ctx, executeScriptAtBlockHeightRequest) + r.log("upstream", "ExecuteScriptAtBlockHeight", err) + + if err != nil { + return nil, err + } + + return executeScriptAtBlockHeightResponse.Value, nil +} + +// ExecuteScriptAtBlockID executes script at the given block id . +func (r *RestProxyHandler) ExecuteScriptAtBlockID(ctx context.Context, blockID flow.Identifier, script []byte, arguments [][]byte) ([]byte, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + executeScriptAtBlockIDRequest := &accessproto.ExecuteScriptAtBlockIDRequest{ + BlockId: blockID[:], + Script: script, + Arguments: arguments, + } + executeScriptAtBlockIDResponse, err := upstream.ExecuteScriptAtBlockID(ctx, executeScriptAtBlockIDRequest) + r.log("upstream", "ExecuteScriptAtBlockID", err) + + if err != nil { + return nil, err + } + + return executeScriptAtBlockIDResponse.Value, nil +} + +// GetEventsForHeightRange returns events by their name in the specified blocks heights. +func (r *RestProxyHandler) GetEventsForHeightRange(ctx context.Context, eventType string, startHeight, endHeight uint64) ([]flow.BlockEvents, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + getEventsForHeightRangeRequest := &accessproto.GetEventsForHeightRangeRequest{ + Type: eventType, + StartHeight: startHeight, + EndHeight: endHeight, + } + eventsResponse, err := upstream.GetEventsForHeightRange(ctx, getEventsForHeightRangeRequest) + r.log("upstream", "GetEventsForHeightRange", err) + + if err != nil { + return nil, err + } + + return convert.MessagesToBlockEvents(eventsResponse.Results), nil +} + +// GetEventsForBlockIDs returns events by their name in the specified block IDs. +func (r *RestProxyHandler) GetEventsForBlockIDs(ctx context.Context, eventType string, blockIDs []flow.Identifier) ([]flow.BlockEvents, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + blockIds := convert.IdentifiersToMessages(blockIDs) + + getEventsForBlockIDsRequest := &accessproto.GetEventsForBlockIDsRequest{ + Type: eventType, + BlockIds: blockIds, + } + eventsResponse, err := upstream.GetEventsForBlockIDs(ctx, getEventsForBlockIDsRequest) + r.log("upstream", "GetEventsForBlockIDs", err) + + if err != nil { + return nil, err + } + + return convert.MessagesToBlockEvents(eventsResponse.Results), nil +} + +// GetExecutionResultForBlockID gets execution result by provided block ID. +func (r *RestProxyHandler) GetExecutionResultForBlockID(ctx context.Context, blockID flow.Identifier) (*flow.ExecutionResult, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + getExecutionResultForBlockID := &accessproto.GetExecutionResultForBlockIDRequest{ + BlockId: blockID[:], + } + executionResultForBlockIDResponse, err := upstream.GetExecutionResultForBlockID(ctx, getExecutionResultForBlockID) + r.log("upstream", "GetExecutionResultForBlockID", err) + + if err != nil { + return nil, err + } + + return convert.MessageToExecutionResult(executionResultForBlockIDResponse.ExecutionResult) +} + +// GetExecutionResultByID gets execution result by its ID. +func (r *RestProxyHandler) GetExecutionResultByID(ctx context.Context, id flow.Identifier) (*flow.ExecutionResult, error) { + upstream, err := r.FaultTolerantClient() + if err != nil { + return nil, err + } + + executionResultByIDRequest := &accessproto.GetExecutionResultByIDRequest{ + Id: id[:], + } + + executionResultByIDResponse, err := upstream.GetExecutionResultByID(ctx, executionResultByIDRequest) + r.log("upstream", "GetExecutionResultByID", err) + + if err != nil { + return nil, err + } + + return convert.MessageToExecutionResult(executionResultByIDResponse.ExecutionResult) +} diff --git a/engine/access/rest/error.go b/engine/access/rest/models/error.go similarity index 98% rename from engine/access/rest/error.go rename to engine/access/rest/models/error.go index 7403510ba55..2247b38743b 100644 --- a/engine/access/rest/error.go +++ b/engine/access/rest/models/error.go @@ -1,4 +1,4 @@ -package rest +package models import "net/http" diff --git a/engine/access/rest/accounts.go b/engine/access/rest/routes/accounts.go similarity index 94% rename from engine/access/rest/accounts.go rename to engine/access/rest/routes/accounts.go index 36371bf6c57..972c2ba68ac 100644 --- a/engine/access/rest/accounts.go +++ b/engine/access/rest/routes/accounts.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "github.com/onflow/flow-go/access" @@ -10,7 +10,7 @@ import ( func GetAccount(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.GetAccountRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } // in case we receive special height values 'final' and 'sealed', fetch that height and overwrite request with it diff --git a/engine/access/rest/accounts_test.go b/engine/access/rest/routes/accounts_test.go similarity index 93% rename from engine/access/rest/accounts_test.go rename to engine/access/rest/routes/accounts_test.go index 61982ff5f9c..b8bebea8e85 100644 --- a/engine/access/rest/accounts_test.go +++ b/engine/access/rest/routes/accounts_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "fmt" @@ -14,6 +14,7 @@ import ( "github.com/onflow/flow-go/access/mock" "github.com/onflow/flow-go/engine/access/rest/middleware" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" ) @@ -33,7 +34,15 @@ func accountURL(t *testing.T, address string, height string) string { return u.String() } -func TestGetAccount(t *testing.T) { +// TestAccessGetAccount tests local getAccount request. +// +// Runs the following tests: +// 1. Get account by address at latest sealed block. +// 2. Get account by address at latest finalized block. +// 3. Get account by address at height. +// 4. Get account by address at height condensed. +// 5. Get invalid account. +func TestAccessGetAccount(t *testing.T) { backend := &mock.API{} t.Run("get by address at latest sealed block", func(t *testing.T) { @@ -165,6 +174,7 @@ func getAccountRequest(t *testing.T, account *flow.Account, height string, expan q.Add(middleware.ExpandQueryParam, fieldParam) req.URL.RawQuery = q.Encode() } + require.NoError(t, err) return req } diff --git a/engine/access/rest/blocks.go b/engine/access/rest/routes/blocks.go similarity index 91% rename from engine/access/rest/blocks.go rename to engine/access/rest/routes/blocks.go index e729f67a9bd..c26f14dd8bf 100644 --- a/engine/access/rest/blocks.go +++ b/engine/access/rest/routes/blocks.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "context" @@ -18,10 +18,11 @@ import ( func GetBlocksByIDs(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.GetBlockByIDsRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } blocks := make([]*models.Block, len(req.IDs)) + for i, id := range req.IDs { block, err := getBlock(forID(&id), r, backend, link) if err != nil { @@ -33,10 +34,11 @@ func GetBlocksByIDs(r *request.Request, backend access.API, link models.LinkGene return blocks, nil } +// GetBlocksByHeight gets blocks by height. func GetBlocksByHeight(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.GetBlockRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } if req.FinalHeight || req.SealedHeight { @@ -72,7 +74,7 @@ func GetBlocksByHeight(r *request.Request, backend access.API, link models.LinkG req.EndHeight = latest.Header.Height // overwrite special value height with fetched if req.StartHeight > req.EndHeight { - return nil, NewBadRequestError(fmt.Errorf("start height must be less than or equal to end height")) + return nil, models.NewBadRequestError(fmt.Errorf("start height must be less than or equal to end height")) } } @@ -93,7 +95,7 @@ func GetBlocksByHeight(r *request.Request, backend access.API, link models.LinkG func GetBlockPayloadByID(r *request.Request, backend access.API, _ models.LinkGenerator) (interface{}, error) { req, err := r.GetBlockPayloadRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } blkProvider := NewBlockProvider(backend, forID(&req.ID)) @@ -194,7 +196,7 @@ func (blkProvider *blockProvider) getBlock(ctx context.Context) (*flow.Block, fl if blkProvider.id != nil { blk, _, err := blkProvider.backend.GetBlockByID(ctx, *blkProvider.id) if err != nil { // unfortunately backend returns internal error status if not found - return nil, flow.BlockStatusUnknown, NewNotFoundError( + return nil, flow.BlockStatusUnknown, models.NewNotFoundError( fmt.Sprintf("error looking up block with ID %s", blkProvider.id.String()), err, ) } @@ -205,14 +207,14 @@ func (blkProvider *blockProvider) getBlock(ctx context.Context) (*flow.Block, fl blk, status, err := blkProvider.backend.GetLatestBlock(ctx, blkProvider.sealed) if err != nil { // cannot be a 'not found' error since final and sealed block should always be found - return nil, flow.BlockStatusUnknown, NewRestError(http.StatusInternalServerError, "block lookup failed", err) + return nil, flow.BlockStatusUnknown, models.NewRestError(http.StatusInternalServerError, "block lookup failed", err) } return blk, status, nil } blk, status, err := blkProvider.backend.GetBlockByHeight(ctx, blkProvider.height) if err != nil { // unfortunately backend returns internal error status if not found - return nil, flow.BlockStatusUnknown, NewNotFoundError( + return nil, flow.BlockStatusUnknown, models.NewNotFoundError( fmt.Sprintf("error looking up block at height %d", blkProvider.height), err, ) } diff --git a/engine/access/rest/blocks_test.go b/engine/access/rest/routes/blocks_test.go similarity index 96% rename from engine/access/rest/blocks_test.go rename to engine/access/rest/routes/blocks_test.go index 7f977b06d69..3abccc9c78a 100644 --- a/engine/access/rest/blocks_test.go +++ b/engine/access/rest/routes/blocks_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "fmt" @@ -31,12 +31,12 @@ type testVector struct { expectedResponse string } -// TestGetBlocks tests the get blocks by ID and get blocks by heights API -func TestGetBlocks(t *testing.T) { - backend := &mock.API{} - - blkCnt := 10 - blockIDs, heights, blocks, executionResults := generateMocks(backend, blkCnt) +func prepareTestVectors(t *testing.T, + blockIDs []string, + heights []string, + blocks []*flow.Block, + executionResults []*flow.ExecutionResult, + blkCnt int) []testVector { singleBlockExpandedResponse := expectedBlockResponsesExpanded(blocks[:1], executionResults[:1], true, flow.BlockStatusUnknown) singleSealedBlockExpandedResponse := expectedBlockResponsesExpanded(blocks[:1], executionResults[:1], true, flow.BlockStatusSealed) @@ -137,6 +137,16 @@ func TestGetBlocks(t *testing.T) { expectedResponse: fmt.Sprintf(`{"code":400, "message": "at most %d IDs can be requested at a time"}`, request.MaxBlockRequestHeightRange), }, } + return testVectors +} + +// TestGetBlocks tests local get blocks by ID and get blocks by heights API +func TestAccessGetBlocks(t *testing.T) { + backend := &mock.API{} + + blkCnt := 10 + blockIDs, heights, blocks, executionResults := generateMocks(backend, blkCnt) + testVectors := prepareTestVectors(t, blockIDs, heights, blocks, executionResults, blkCnt) for _, tv := range testVectors { responseRec, err := executeRequest(tv.request, backend) diff --git a/engine/access/rest/collections.go b/engine/access/rest/routes/collections.go similarity index 94% rename from engine/access/rest/collections.go rename to engine/access/rest/routes/collections.go index 807be2c0c41..47b6150f480 100644 --- a/engine/access/rest/collections.go +++ b/engine/access/rest/routes/collections.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "github.com/onflow/flow-go/access" @@ -11,7 +11,7 @@ import ( func GetCollectionByID(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.GetCollectionRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } collection, err := backend.GetCollectionByID(r.Context(), req.ID) diff --git a/engine/access/rest/collections_test.go b/engine/access/rest/routes/collections_test.go similarity index 99% rename from engine/access/rest/collections_test.go rename to engine/access/rest/routes/collections_test.go index 3981541f3a7..de05152b6d5 100644 --- a/engine/access/rest/collections_test.go +++ b/engine/access/rest/routes/collections_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "encoding/json" diff --git a/engine/access/rest/events.go b/engine/access/rest/routes/events.go similarity index 85% rename from engine/access/rest/events.go rename to engine/access/rest/routes/events.go index 2a79939bc21..4f03624c768 100644 --- a/engine/access/rest/events.go +++ b/engine/access/rest/routes/events.go @@ -1,22 +1,21 @@ -package rest +package routes import ( "fmt" + "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/models" "github.com/onflow/flow-go/engine/access/rest/request" - - "github.com/onflow/flow-go/access" ) -const blockQueryParam = "block_ids" -const eventTypeQuery = "type" +const BlockQueryParam = "block_ids" +const EventTypeQuery = "type" // GetEvents for the provided block range or list of block IDs filtered by type. func GetEvents(r *request.Request, backend access.API, _ models.LinkGenerator) (interface{}, error) { req, err := r.GetEventsRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } // if the request has block IDs provided then return events for block IDs @@ -41,7 +40,7 @@ func GetEvents(r *request.Request, backend access.API, _ models.LinkGenerator) ( req.EndHeight = latest.Height // special check after we resolve special height value if req.StartHeight > req.EndHeight { - return nil, NewBadRequestError(fmt.Errorf("current retrieved end height value is lower than start height")) + return nil, models.NewBadRequestError(fmt.Errorf("current retrieved end height value is lower than start height")) } } diff --git a/engine/access/rest/events_test.go b/engine/access/rest/routes/events_test.go similarity index 98% rename from engine/access/rest/events_test.go rename to engine/access/rest/routes/events_test.go index 9f0fede2c6c..47d4d89fd52 100644 --- a/engine/access/rest/events_test.go +++ b/engine/access/rest/routes/events_test.go @@ -1,24 +1,25 @@ -package rest +package routes import ( "encoding/json" "fmt" + "net/http" "net/url" "strings" "testing" "time" - "github.com/onflow/flow-go/engine/access/rest/util" - - "github.com/onflow/flow-go/access/mock" - "github.com/onflow/flow-go/model/flow" - "github.com/onflow/flow-go/utils/unittest" - mocks "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + "github.com/onflow/flow-go/access/mock" + "github.com/onflow/flow-go/engine/access/rest/util" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" ) func TestGetEvents(t *testing.T) { @@ -136,7 +137,7 @@ func getEventReq(t *testing.T, eventType string, start string, end string, block q := u.Query() if len(blockIDs) > 0 { - q.Add(blockQueryParam, strings.Join(blockIDs, ",")) + q.Add(BlockQueryParam, strings.Join(blockIDs, ",")) } if start != "" && end != "" { @@ -144,7 +145,7 @@ func getEventReq(t *testing.T, eventType string, start string, end string, block q.Add(endHeightQueryParam, end) } - q.Add(eventTypeQuery, eventType) + q.Add(EventTypeQuery, eventType) u.RawQuery = q.Encode() diff --git a/engine/access/rest/execution_result.go b/engine/access/rest/routes/execution_result.go similarity index 89% rename from engine/access/rest/execution_result.go rename to engine/access/rest/routes/execution_result.go index b0583d43b0d..b999665b26b 100644 --- a/engine/access/rest/execution_result.go +++ b/engine/access/rest/routes/execution_result.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "fmt" @@ -12,7 +12,7 @@ import ( func GetExecutionResultsByBlockIDs(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.GetExecutionResultByBlockIDsRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } // for each block ID we retrieve execution result @@ -38,7 +38,7 @@ func GetExecutionResultsByBlockIDs(r *request.Request, backend access.API, link func GetExecutionResultByID(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.GetExecutionResultRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } res, err := backend.GetExecutionResultByID(r.Context(), req.ID) @@ -48,7 +48,7 @@ func GetExecutionResultByID(r *request.Request, backend access.API, link models. if res == nil { err := fmt.Errorf("execution result with ID: %s not found", req.ID.String()) - return nil, NewNotFoundError(err.Error(), err) + return nil, models.NewNotFoundError(err.Error(), err) } var response models.ExecutionResult diff --git a/engine/access/rest/execution_result_test.go b/engine/access/rest/routes/execution_result_test.go similarity index 99% rename from engine/access/rest/execution_result_test.go rename to engine/access/rest/routes/execution_result_test.go index adb3852c668..ba74974af1a 100644 --- a/engine/access/rest/execution_result_test.go +++ b/engine/access/rest/routes/execution_result_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "fmt" @@ -37,7 +37,6 @@ func getResultByIDReq(id string, blockIDs []string) *http.Request { } func TestGetResultByID(t *testing.T) { - t.Run("get by ID", func(t *testing.T) { backend := &mock.API{} result := unittest.ExecutionResultFixture() @@ -68,6 +67,7 @@ func TestGetResultByID(t *testing.T) { } func TestGetResultBlockID(t *testing.T) { + t.Run("get by block ID", func(t *testing.T) { backend := &mock.API{} blockID := unittest.IdentifierFixture() diff --git a/engine/access/rest/handler.go b/engine/access/rest/routes/handler.go similarity index 95% rename from engine/access/rest/handler.go rename to engine/access/rest/routes/handler.go index 028176fc9e0..e323843e50e 100644 --- a/engine/access/rest/handler.go +++ b/engine/access/rest/routes/handler.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "encoding/json" @@ -6,17 +6,17 @@ import ( "fmt" "net/http" - "github.com/onflow/flow-go/engine/access/rest/models" - "github.com/onflow/flow-go/engine/access/rest/request" - "github.com/onflow/flow-go/engine/access/rest/util" - fvmErrors "github.com/onflow/flow-go/fvm/errors" - "github.com/onflow/flow-go/model/flow" - "github.com/rs/zerolog" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/rest/models" + "github.com/onflow/flow-go/engine/access/rest/request" + "github.com/onflow/flow-go/engine/access/rest/util" + fvmErrors "github.com/onflow/flow-go/fvm/errors" + "github.com/onflow/flow-go/model/flow" ) const MaxRequestSize = 2 << 20 // 2MB @@ -93,7 +93,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *Handler) errorHandler(w http.ResponseWriter, err error, errorLogger zerolog.Logger) { // rest status type error should be returned with status and user message provided - var statusErr StatusError + var statusErr models.StatusError if errors.As(err, &statusErr) { h.errorResponse(w, statusErr.Status(), statusErr.UserMessage(), errorLogger) return @@ -124,6 +124,11 @@ func (h *Handler) errorHandler(w http.ResponseWriter, err error, errorLogger zer h.errorResponse(w, http.StatusBadRequest, msg, errorLogger) return } + if se.Code() == codes.Unavailable { + msg := fmt.Sprintf("Failed to process request: %s", se.Message()) + h.errorResponse(w, http.StatusServiceUnavailable, msg, errorLogger) + return + } } // stop going further - catch all error diff --git a/engine/access/rest/network.go b/engine/access/rest/routes/network.go similarity index 87% rename from engine/access/rest/network.go rename to engine/access/rest/routes/network.go index 6100bc765d5..82abcbb6d49 100644 --- a/engine/access/rest/network.go +++ b/engine/access/rest/routes/network.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "github.com/onflow/flow-go/access" @@ -7,7 +7,7 @@ import ( ) // GetNetworkParameters returns network-wide parameters of the blockchain -func GetNetworkParameters(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { +func GetNetworkParameters(r *request.Request, backend access.API, _ models.LinkGenerator) (interface{}, error) { params := backend.GetNetworkParameters(r.Context()) var response models.NetworkParameters diff --git a/engine/access/rest/network_test.go b/engine/access/rest/routes/network_test.go similarity index 98% rename from engine/access/rest/network_test.go rename to engine/access/rest/routes/network_test.go index c4ce7492476..00d0ca03944 100644 --- a/engine/access/rest/network_test.go +++ b/engine/access/rest/routes/network_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "fmt" diff --git a/engine/access/rest/node_version_info.go b/engine/access/rest/routes/node_version_info.go similarity index 87% rename from engine/access/rest/node_version_info.go rename to engine/access/rest/routes/node_version_info.go index 899d159cf4f..31e172bba9f 100644 --- a/engine/access/rest/node_version_info.go +++ b/engine/access/rest/routes/node_version_info.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "github.com/onflow/flow-go/access" @@ -7,7 +7,7 @@ import ( ) // GetNodeVersionInfo returns node version information -func GetNodeVersionInfo(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { +func GetNodeVersionInfo(r *request.Request, backend access.API, _ models.LinkGenerator) (interface{}, error) { params, err := backend.GetNodeVersionInfo(r.Context()) if err != nil { return nil, err diff --git a/engine/access/rest/node_version_info_test.go b/engine/access/rest/routes/node_version_info_test.go similarity index 99% rename from engine/access/rest/node_version_info_test.go rename to engine/access/rest/routes/node_version_info_test.go index 4140089a280..25f19ae1f3c 100644 --- a/engine/access/rest/node_version_info_test.go +++ b/engine/access/rest/routes/node_version_info_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "fmt" diff --git a/engine/access/rest/router.go b/engine/access/rest/routes/router.go similarity index 98% rename from engine/access/rest/router.go rename to engine/access/rest/routes/router.go index a6bfaa501c5..a2185e4e9a3 100644 --- a/engine/access/rest/router.go +++ b/engine/access/rest/routes/router.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "fmt" @@ -16,7 +16,7 @@ import ( "github.com/onflow/flow-go/module" ) -func newRouter(backend access.API, logger zerolog.Logger, chain flow.Chain, restCollector module.RestMetrics) (*mux.Router, error) { +func NewRouter(backend access.API, logger zerolog.Logger, chain flow.Chain, restCollector module.RestMetrics) (*mux.Router, error) { router := mux.NewRouter().StrictSlash(true) v1SubRouter := router.PathPrefix("/v1").Subrouter() diff --git a/engine/access/rest/router_test.go b/engine/access/rest/routes/router_test.go similarity index 99% rename from engine/access/rest/router_test.go rename to engine/access/rest/routes/router_test.go index 5b6578be8a1..e3c2a2c3fdd 100644 --- a/engine/access/rest/router_test.go +++ b/engine/access/rest/routes/router_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "testing" diff --git a/engine/access/rest/scripts.go b/engine/access/rest/routes/scripts.go similarity index 94% rename from engine/access/rest/scripts.go rename to engine/access/rest/routes/scripts.go index 8bd86bae54f..8627470ab88 100644 --- a/engine/access/rest/scripts.go +++ b/engine/access/rest/routes/scripts.go @@ -1,18 +1,17 @@ -package rest +package routes import ( + "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/engine/access/rest/models" "github.com/onflow/flow-go/engine/access/rest/request" "github.com/onflow/flow-go/model/flow" - - "github.com/onflow/flow-go/access" ) // ExecuteScript handler sends the script from the request to be executed. func ExecuteScript(r *request.Request, backend access.API, _ models.LinkGenerator) (interface{}, error) { req, err := r.GetScriptRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } if req.BlockID != flow.ZeroID { diff --git a/engine/access/rest/scripts_test.go b/engine/access/rest/routes/scripts_test.go similarity index 99% rename from engine/access/rest/scripts_test.go rename to engine/access/rest/routes/scripts_test.go index 7e3271c1d81..8a6a63cc819 100644 --- a/engine/access/rest/scripts_test.go +++ b/engine/access/rest/routes/scripts_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "bytes" diff --git a/engine/access/rest/test_helpers.go b/engine/access/rest/routes/test_helpers.go similarity index 77% rename from engine/access/rest/test_helpers.go rename to engine/access/rest/routes/test_helpers.go index 88170769c99..e512cc94434 100644 --- a/engine/access/rest/test_helpers.go +++ b/engine/access/rest/routes/test_helpers.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "bytes" @@ -11,7 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/onflow/flow-go/access/mock" + "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" ) @@ -26,11 +26,11 @@ const ( heightQueryParam = "height" ) -func executeRequest(req *http.Request, backend *mock.API) (*httptest.ResponseRecorder, error) { +func executeRequest(req *http.Request, backend access.API) (*httptest.ResponseRecorder, error) { var b bytes.Buffer logger := zerolog.New(&b) - restCollector := metrics.NewNoopCollector() - router, err := newRouter(backend, logger, flow.Testnet.Chain(), restCollector) + + router, err := NewRouter(backend, logger, flow.Testnet.Chain(), metrics.NewNoopCollector()) if err != nil { return nil, err } @@ -40,14 +40,13 @@ func executeRequest(req *http.Request, backend *mock.API) (*httptest.ResponseRec return rr, nil } -func assertOKResponse(t *testing.T, req *http.Request, expectedRespBody string, backend *mock.API) { +func assertOKResponse(t *testing.T, req *http.Request, expectedRespBody string, backend access.API) { assertResponse(t, req, http.StatusOK, expectedRespBody, backend) } -func assertResponse(t *testing.T, req *http.Request, status int, expectedRespBody string, backend *mock.API) { +func assertResponse(t *testing.T, req *http.Request, status int, expectedRespBody string, backend access.API) { rr, err := executeRequest(req, backend) assert.NoError(t, err) - actualResponseBody := rr.Body.String() require.JSONEq(t, expectedRespBody, diff --git a/engine/access/rest/transactions.go b/engine/access/rest/routes/transactions.go similarity index 92% rename from engine/access/rest/transactions.go rename to engine/access/rest/routes/transactions.go index f8dfc83dedb..b77aead82b4 100644 --- a/engine/access/rest/transactions.go +++ b/engine/access/rest/routes/transactions.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "github.com/onflow/flow-go/access" @@ -10,7 +10,7 @@ import ( func GetTransactionByID(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.GetTransactionRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } tx, err := backend.GetTransaction(r.Context(), req.ID) @@ -36,7 +36,7 @@ func GetTransactionByID(r *request.Request, backend access.API, link models.Link func GetTransactionResultByID(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.GetTransactionResultRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } txr, err := backend.GetTransactionResult(r.Context(), req.ID, req.BlockID, req.CollectionID) @@ -53,7 +53,7 @@ func GetTransactionResultByID(r *request.Request, backend access.API, link model func CreateTransaction(r *request.Request, backend access.API, link models.LinkGenerator) (interface{}, error) { req, err := r.CreateTransactionRequest() if err != nil { - return nil, NewBadRequestError(err) + return nil, models.NewBadRequestError(err) } err = backend.SendTransaction(r.Context(), &req.Transaction) diff --git a/engine/access/rest/transactions_test.go b/engine/access/rest/routes/transactions_test.go similarity index 89% rename from engine/access/rest/transactions_test.go rename to engine/access/rest/routes/transactions_test.go index 26710c747e5..3b02c4d5de5 100644 --- a/engine/access/rest/transactions_test.go +++ b/engine/access/rest/routes/transactions_test.go @@ -1,4 +1,4 @@ -package rest +package routes import ( "bytes" @@ -69,40 +69,7 @@ func createTransactionReq(body interface{}) *http.Request { return req } -func validCreateBody(tx flow.TransactionBody) map[string]interface{} { - tx.Arguments = [][]uint8{} // fix how fixture creates nil values - auth := make([]string, len(tx.Authorizers)) - for i, a := range tx.Authorizers { - auth[i] = a.String() - } - - return map[string]interface{}{ - "script": util.ToBase64(tx.Script), - "arguments": tx.Arguments, - "reference_block_id": tx.ReferenceBlockID.String(), - "gas_limit": fmt.Sprintf("%d", tx.GasLimit), - "payer": tx.Payer.String(), - "proposal_key": map[string]interface{}{ - "address": tx.ProposalKey.Address.String(), - "key_index": fmt.Sprintf("%d", tx.ProposalKey.KeyIndex), - "sequence_number": fmt.Sprintf("%d", tx.ProposalKey.SequenceNumber), - }, - "authorizers": auth, - "payload_signatures": []map[string]interface{}{{ - "address": tx.PayloadSignatures[0].Address.String(), - "key_index": fmt.Sprintf("%d", tx.PayloadSignatures[0].KeyIndex), - "signature": util.ToBase64(tx.PayloadSignatures[0].Signature), - }}, - "envelope_signatures": []map[string]interface{}{{ - "address": tx.EnvelopeSignatures[0].Address.String(), - "key_index": fmt.Sprintf("%d", tx.EnvelopeSignatures[0].KeyIndex), - "signature": util.ToBase64(tx.EnvelopeSignatures[0].Signature), - }}, - } -} - func TestGetTransactions(t *testing.T) { - t.Run("get by ID without results", func(t *testing.T) { backend := &mock.API{} tx := unittest.TransactionFixture() @@ -150,6 +117,7 @@ func TestGetTransactions(t *testing.T) { t.Run("Get by ID with results", func(t *testing.T) { backend := &mock.API{} + tx := unittest.TransactionFixture() txr := transactionResultFixture(tx) @@ -227,6 +195,7 @@ func TestGetTransactions(t *testing.T) { t.Run("get by ID non-existing", func(t *testing.T) { backend := &mock.API{} + tx := unittest.TransactionFixture() req := getTransactionReq(tx.ID().String(), false, "", "") @@ -278,6 +247,7 @@ func TestGetTransactionResult(t *testing.T) { t.Run("get by transaction ID", func(t *testing.T) { backend := &mock.API{} + req := getTransactionResultReq(id.String(), "", "") backend.Mock. @@ -289,6 +259,7 @@ func TestGetTransactionResult(t *testing.T) { t.Run("get by block ID", func(t *testing.T) { backend := &mock.API{} + req := getTransactionResultReq(id.String(), bid.String(), "") backend.Mock. @@ -300,6 +271,7 @@ func TestGetTransactionResult(t *testing.T) { t.Run("get by collection ID", func(t *testing.T) { backend := &mock.API{} + req := getTransactionResultReq(id.String(), "", cid.String()) backend.Mock. @@ -311,6 +283,7 @@ func TestGetTransactionResult(t *testing.T) { t.Run("get execution statuses", func(t *testing.T) { backend := &mock.API{} + testVectors := map[*access.TransactionResult]string{{ Status: flow.TransactionStatusExpired, ErrorMessage: "", @@ -359,6 +332,7 @@ func TestGetTransactionResult(t *testing.T) { t.Run("get by ID Invalid", func(t *testing.T) { backend := &mock.API{} + req := getTransactionResultReq("invalid", "", "") expected := `{"code":400, "message":"invalid ID format"}` @@ -367,13 +341,13 @@ func TestGetTransactionResult(t *testing.T) { } func TestCreateTransaction(t *testing.T) { + backend := &mock.API{} t.Run("create", func(t *testing.T) { - backend := &mock.API{} tx := unittest.TransactionBodyFixture() tx.PayloadSignatures = []flow.TransactionSignature{unittest.TransactionSignatureFixture()} tx.Arguments = [][]uint8{} - req := createTransactionReq(validCreateBody(tx)) + req := createTransactionReq(unittest.CreateSendTxHttpPayload(tx)) backend.Mock. On("SendTransaction", mocks.Anything, &tx). @@ -421,7 +395,6 @@ func TestCreateTransaction(t *testing.T) { }) t.Run("post invalid transaction", func(t *testing.T) { - backend := &mock.API{} tests := []struct { inputField string inputValue string @@ -441,7 +414,7 @@ func TestCreateTransaction(t *testing.T) { for _, test := range tests { tx := unittest.TransactionBodyFixture() tx.PayloadSignatures = []flow.TransactionSignature{unittest.TransactionSignatureFixture()} - testTx := validCreateBody(tx) + testTx := unittest.CreateSendTxHttpPayload(tx) testTx[test.inputField] = test.inputValue req := createTransactionReq(testTx) diff --git a/engine/access/rest/server.go b/engine/access/rest/server.go index a1aa83710d8..4a4b1be6f0e 100644 --- a/engine/access/rest/server.go +++ b/engine/access/rest/server.go @@ -8,14 +8,14 @@ import ( "github.com/rs/zerolog" "github.com/onflow/flow-go/access" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" ) // NewServer returns an HTTP server initialized with the REST API handler -func NewServer(backend access.API, listenAddress string, logger zerolog.Logger, chain flow.Chain, restCollector module.RestMetrics) (*http.Server, error) { - - router, err := newRouter(backend, logger, chain, restCollector) +func NewServer(serverAPI access.API, listenAddress string, logger zerolog.Logger, chain flow.Chain, restCollector module.RestMetrics) (*http.Server, error) { + router, err := routes.NewRouter(serverAPI, logger, chain, restCollector) if err != nil { return nil, err } diff --git a/engine/access/rest_api_test.go b/engine/access/rest_api_test.go index 8cab886605c..24ecf554627 100644 --- a/engine/access/rest_api_test.go +++ b/engine/access/rest_api_test.go @@ -24,9 +24,10 @@ import ( "github.com/stretchr/testify/suite" accessmock "github.com/onflow/flow-go/engine/access/mock" - "github.com/onflow/flow-go/engine/access/rest" "github.com/onflow/flow-go/engine/access/rest/request" + "github.com/onflow/flow-go/engine/access/rest/routes" "github.com/onflow/flow-go/engine/access/rpc" + "github.com/onflow/flow-go/engine/access/rpc/backend" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" @@ -150,10 +151,7 @@ func (suite *RestAPITestSuite) SetupTest() { nil, nil).Build() - rpcEngBuilder, err := rpc.NewBuilder( - suite.log, - suite.state, - config, + backend := backend.New(suite.state, suite.collClient, nil, suite.blocks, @@ -164,11 +162,25 @@ func (suite *RestAPITestSuite) SetupTest() { suite.executionResults, suite.chainID, suite.metrics, + nil, + false, 0, + nil, + nil, + suite.log, 0, - false, + nil) + + rpcEngBuilder, err := rpc.NewBuilder( + suite.log, + suite.state, + config, + suite.chainID, + suite.metrics, false, suite.me, + backend, + backend, suite.secureGrpcServer, suite.unsecureGrpcServer, ) @@ -399,7 +411,7 @@ func (suite *RestAPITestSuite) TestRequestSizeRestriction() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() // make a request of size larger than the max permitted size - requestBytes := make([]byte, rest.MaxRequestSize+1) + requestBytes := make([]byte, routes.MaxRequestSize+1) script := restclient.ScriptsBody{ Script: string(requestBytes), } @@ -426,13 +438,13 @@ func assertError(t *testing.T, resp *http.Response, err error, expectedCode int, func optionsForBlockByID() *restclient.BlocksApiBlocksIdGetOpts { return &restclient.BlocksApiBlocksIdGetOpts{ - Expand: optional.NewInterface([]string{rest.ExpandableFieldPayload}), + Expand: optional.NewInterface([]string{routes.ExpandableFieldPayload}), Select_: optional.NewInterface([]string{"header.id"}), } } func optionsForBlockByStartEndHeight(startHeight, endHeight uint64) *restclient.BlocksApiBlocksGetOpts { return &restclient.BlocksApiBlocksGetOpts{ - Expand: optional.NewInterface([]string{rest.ExpandableFieldPayload}), + Expand: optional.NewInterface([]string{routes.ExpandableFieldPayload}), Select_: optional.NewInterface([]string{"header.id", "header.height"}), StartHeight: optional.NewInterface(startHeight), EndHeight: optional.NewInterface(endHeight), @@ -441,7 +453,7 @@ func optionsForBlockByStartEndHeight(startHeight, endHeight uint64) *restclient. func optionsForBlockByHeights(heights []uint64) *restclient.BlocksApiBlocksGetOpts { return &restclient.BlocksApiBlocksGetOpts{ - Expand: optional.NewInterface([]string{rest.ExpandableFieldPayload}), + Expand: optional.NewInterface([]string{routes.ExpandableFieldPayload}), Select_: optional.NewInterface([]string{"header.id", "header.height"}), Height: optional.NewInterface(heights), } @@ -449,7 +461,7 @@ func optionsForBlockByHeights(heights []uint64) *restclient.BlocksApiBlocksGetOp func optionsForFinalizedBlock(finalOrSealed string) *restclient.BlocksApiBlocksGetOpts { return &restclient.BlocksApiBlocksGetOpts{ - Expand: optional.NewInterface([]string{rest.ExpandableFieldPayload}), + Expand: optional.NewInterface([]string{routes.ExpandableFieldPayload}), Select_: optional.NewInterface([]string{"header.id", "header.height"}), Height: optional.NewInterface(finalOrSealed), } diff --git a/engine/access/rpc/backend/backend.go b/engine/access/rpc/backend/backend.go index 9cce94f66a3..fbc98f5319a 100644 --- a/engine/access/rpc/backend/backend.go +++ b/engine/access/rpc/backend/backend.go @@ -11,7 +11,6 @@ import ( "google.golang.org/grpc/status" lru "github.com/hashicorp/golang-lru" - accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" "github.com/onflow/flow-go/access" @@ -23,6 +22,8 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" ) // maxExecutionNodesCnt is the max number of execution nodes that will be contacted to complete an execution api request @@ -80,6 +81,17 @@ type Backend struct { connFactory ConnectionFactory } +// Config defines the configurable options for creating Backend +type Config struct { + ExecutionClientTimeout time.Duration // execution API GRPC client timeout + CollectionClientTimeout time.Duration // collection API GRPC client timeout + ConnectionPoolSize uint // size of the cache for storing collection and execution connections + MaxHeightRange uint // max size of height range requests + PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs + FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs + ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node +} + func New( state protocol.State, collectionRPC accessproto.AccessAPIClient, @@ -201,6 +213,40 @@ func New( return b } +// NewCache constructs cache for storing connections to other nodes. +// No errors are expected during normal operations. +func NewCache( + log zerolog.Logger, + accessMetrics module.AccessMetrics, + connectionPoolSize uint, +) (*lru.Cache, uint, error) { + + var cache *lru.Cache + cacheSize := connectionPoolSize + if cacheSize > 0 { + // TODO: remove this fallback after fixing issues with evictions + // It was observed that evictions cause connection errors for in flight requests. This works around + // the issue by forcing hte pool size to be greater than the number of ENs + LNs + if cacheSize < DefaultConnectionPoolSize { + log.Warn().Msg("connection pool size below threshold, setting pool size to default value ") + cacheSize = DefaultConnectionPoolSize + } + var err error + cache, err = lru.NewWithEvict(int(cacheSize), func(_, evictedValue interface{}) { + store := evictedValue.(*CachedClient) + store.Close() + log.Debug().Str("grpc_conn_evicted", store.Address).Msg("closing grpc connection evicted from pool") + if accessMetrics != nil { + accessMetrics.ConnectionFromPoolEvicted() + } + }) + if err != nil { + return nil, 0, fmt.Errorf("could not initialize connection pool cache: %w", err) + } + } + return cache, cacheSize, nil +} + func identifierList(ids []string) (flow.IdentifierList, error) { idList := make(flow.IdentifierList, len(ids)) for i, idStr := range ids { diff --git a/engine/access/rpc/engine.go b/engine/access/rpc/engine.go index bfbeb07aa53..eea6dc5d17c 100644 --- a/engine/access/rpc/engine.go +++ b/engine/access/rpc/engine.go @@ -7,13 +7,12 @@ import ( "net" "net/http" "sync" - "time" - lru "github.com/hashicorp/golang-lru" - accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" + "google.golang.org/grpc/credentials" + "github.com/onflow/flow-go/access" "github.com/onflow/flow-go/consensus/hotstuff/model" "github.com/onflow/flow-go/engine/access/rest" "github.com/onflow/flow-go/engine/access/rpc/backend" @@ -24,28 +23,22 @@ import ( "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/state/protocol" - "github.com/onflow/flow-go/storage" ) // Config defines the configurable options for the access node server // A secure GRPC server here implies a server that presents a self-signed TLS certificate and a client that authenticates // the server via a pre-shared public key type Config struct { - UnsecureGRPCListenAddr string // the non-secure GRPC server address as ip:port - SecureGRPCListenAddr string // the secure GRPC server address as ip:port - TransportCredentials credentials.TransportCredentials // the secure GRPC credentials - HTTPListenAddr string // the HTTP web proxy address as ip:port - RESTListenAddr string // the REST server address as ip:port (if empty the REST server will not be started) - CollectionAddr string // the address of the upstream collection node - HistoricalAccessAddrs string // the list of all access nodes from previous spork - MaxMsgSize uint // GRPC max message size - ExecutionClientTimeout time.Duration // execution API GRPC client timeout - CollectionClientTimeout time.Duration // collection API GRPC client timeout - ConnectionPoolSize uint // size of the cache for storing collection and execution connections - MaxHeightRange uint // max size of height range requests - PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs - FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs - ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node + UnsecureGRPCListenAddr string // the non-secure GRPC server address as ip:port + SecureGRPCListenAddr string // the secure GRPC server address as ip:port + TransportCredentials credentials.TransportCredentials // the secure GRPC credentials + HTTPListenAddr string // the HTTP web proxy address as ip:port + RESTListenAddr string // the REST server address as ip:port (if empty the REST server will not be started) + CollectionAddr string // the address of the upstream collection node + HistoricalAccessAddrs string // the list of all access nodes from previous spork + + BackendConfig backend.Config // configurable options for creating Backend + MaxMsgSize uint // GRPC max message size } // Engine exposes the server with a simplified version of the Access API. @@ -68,95 +61,31 @@ type Engine struct { config Config chain flow.Chain + restHandler access.API + addrLock sync.RWMutex restAPIAddress net.Addr } +type Option func(*RPCEngineBuilder) // NewBuilder returns a new RPC engine builder. func NewBuilder(log zerolog.Logger, state protocol.State, config Config, - collectionRPC accessproto.AccessAPIClient, - historicalAccessNodes []accessproto.AccessAPIClient, - blocks storage.Blocks, - headers storage.Headers, - collections storage.Collections, - transactions storage.Transactions, - executionReceipts storage.ExecutionReceipts, - executionResults storage.ExecutionResults, chainID flow.ChainID, accessMetrics module.AccessMetrics, - collectionGRPCPort uint, - executionGRPCPort uint, - retryEnabled bool, rpcMetricsEnabled bool, me module.Local, + backend *backend.Backend, + restHandler access.API, secureGrpcServer *grpcserver.GrpcServer, unsecureGrpcServer *grpcserver.GrpcServer, ) (*RPCEngineBuilder, error) { - log = log.With().Str("engine", "rpc").Logger() // wrap the unsecured server with an HTTP proxy server to serve HTTP clients httpServer := newHTTPProxyServer(unsecureGrpcServer.Server) - var cache *lru.Cache - cacheSize := config.ConnectionPoolSize - if cacheSize > 0 { - // TODO: remove this fallback after fixing issues with evictions - // It was observed that evictions cause connection errors for in flight requests. This works around - // the issue by forcing hte pool size to be greater than the number of ENs + LNs - if cacheSize < backend.DefaultConnectionPoolSize { - log.Warn().Msg("connection pool size below threshold, setting pool size to default value ") - cacheSize = backend.DefaultConnectionPoolSize - } - var err error - cache, err = lru.NewWithEvict(int(cacheSize), func(_, evictedValue interface{}) { - store := evictedValue.(*backend.CachedClient) - store.Close() - log.Debug().Str("grpc_conn_evicted", store.Address).Msg("closing grpc connection evicted from pool") - if accessMetrics != nil { - accessMetrics.ConnectionFromPoolEvicted() - } - }) - if err != nil { - return nil, fmt.Errorf("could not initialize connection pool cache: %w", err) - } - } - - connectionFactory := &backend.ConnectionFactoryImpl{ - CollectionGRPCPort: collectionGRPCPort, - ExecutionGRPCPort: executionGRPCPort, - CollectionNodeGRPCTimeout: config.CollectionClientTimeout, - ExecutionNodeGRPCTimeout: config.ExecutionClientTimeout, - ConnectionsCache: cache, - CacheSize: cacheSize, - MaxMsgSize: config.MaxMsgSize, - AccessMetrics: accessMetrics, - Log: log, - } - - backend := backend.New(state, - collectionRPC, - historicalAccessNodes, - blocks, - headers, - collections, - transactions, - executionReceipts, - executionResults, - chainID, - accessMetrics, - connectionFactory, - retryEnabled, - config.MaxHeightRange, - config.PreferredExecutionNodeIDs, - config.FixedExecutionNodeIDs, - log, - backend.DefaultSnapshotHistoryLimit, - config.ArchiveAddressList, - ) - finalizedCache, finalizedCacheWorker, err := events.NewFinalizedHeaderCache(state) if err != nil { return nil, fmt.Errorf("could not create header cache: %w", err) @@ -173,6 +102,7 @@ func NewBuilder(log zerolog.Logger, config: config, chain: chainID.Chain(), restCollector: accessMetrics, + restHandler: restHandler, } backendNotifierActor, backendNotifierWorker := events.NewFinalizationActor(eng.notifyBackendOnBlockFinalized) eng.backendNotifierActor = backendNotifierActor @@ -282,7 +212,7 @@ func (e *Engine) serveREST(ctx irrecoverable.SignalerContext, ready component.Re e.log.Info().Str("rest_api_address", e.config.RESTListenAddr).Msg("starting REST server on address") - r, err := rest.NewServer(e.backend, e.config.RESTListenAddr, e.log, e.chain, e.restCollector) + r, err := rest.NewServer(e.restHandler, e.config.RESTListenAddr, e.log, e.chain, e.restCollector) if err != nil { e.log.Err(err).Msg("failed to initialize the REST server") ctx.Throw(err) diff --git a/engine/access/rpc/engine_builder.go b/engine/access/rpc/engine_builder.go index a2630d26df8..370f3d0fff4 100644 --- a/engine/access/rpc/engine_builder.go +++ b/engine/access/rpc/engine_builder.go @@ -5,13 +5,13 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - accessproto "github.com/onflow/flow/protobuf/go/flow/access" - legacyaccessproto "github.com/onflow/flow/protobuf/go/flow/legacy/access" - "github.com/onflow/flow-go/access" legacyaccess "github.com/onflow/flow-go/access/legacy" "github.com/onflow/flow-go/consensus/hotstuff" "github.com/onflow/flow-go/module" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" + legacyaccessproto "github.com/onflow/flow/protobuf/go/flow/legacy/access" ) type RPCEngineBuilder struct { @@ -21,7 +21,7 @@ type RPCEngineBuilder struct { // optional parameters, only one can be set during build phase signerIndicesDecoder hotstuff.BlockSignerDecoder - handler accessproto.AccessAPIServer // Use the parent interface instead of implementation, so that we can assign it to proxy. + rpcHandler accessproto.AccessAPIServer // Use the parent interface instead of implementation, so that we can assign it to proxy. } // NewRPCEngineBuilder helps to build a new RPC engine. @@ -34,8 +34,8 @@ func NewRPCEngineBuilder(engine *Engine, me module.Local, finalizedHeaderCache m } } -func (builder *RPCEngineBuilder) Handler() accessproto.AccessAPIServer { - return builder.handler +func (builder *RPCEngineBuilder) RpcHandler() accessproto.AccessAPIServer { + return builder.rpcHandler } // WithBlockSignerDecoder specifies that signer indices in block headers should be translated @@ -51,15 +51,15 @@ func (builder *RPCEngineBuilder) WithBlockSignerDecoder(signerIndicesDecoder hot return builder } -// WithNewHandler specifies that the given `AccessAPIServer` should be used for serving API queries. +// WithRpcHandler specifies that the given `AccessAPIServer` should be used for serving API queries. // Caution: // you can inject either a `BlockSignerDecoder` (via method `WithBlockSignerDecoder`) -// or an `AccessAPIServer` (via method `WithNewHandler`); but not both. If both are +// or an `AccessAPIServer` (via method `WithRpcHandler`); but not both. If both are // specified, the builder will error during the build step. // // Returns self-reference for chaining. -func (builder *RPCEngineBuilder) WithNewHandler(handler accessproto.AccessAPIServer) *RPCEngineBuilder { - builder.handler = handler +func (builder *RPCEngineBuilder) WithRpcHandler(handler accessproto.AccessAPIServer) *RPCEngineBuilder { + builder.rpcHandler = handler return builder } @@ -89,18 +89,18 @@ func (builder *RPCEngineBuilder) WithMetrics() *RPCEngineBuilder { } func (builder *RPCEngineBuilder) Build() (*Engine, error) { - if builder.signerIndicesDecoder != nil && builder.handler != nil { + if builder.signerIndicesDecoder != nil && builder.rpcHandler != nil { return nil, fmt.Errorf("only BlockSignerDecoder (via method `WithBlockSignerDecoder`) or AccessAPIServer (via method `WithNewHandler`) can be specified but not both") } - handler := builder.handler - if handler == nil { + rpcHandler := builder.rpcHandler + if rpcHandler == nil { if builder.signerIndicesDecoder == nil { - handler = access.NewHandler(builder.Engine.backend, builder.Engine.chain, builder.finalizedHeaderCache, builder.me) + rpcHandler = access.NewHandler(builder.Engine.backend, builder.Engine.chain, builder.finalizedHeaderCache, builder.me) } else { - handler = access.NewHandler(builder.Engine.backend, builder.Engine.chain, builder.finalizedHeaderCache, builder.me, access.WithBlockSignerDecoder(builder.signerIndicesDecoder)) + rpcHandler = access.NewHandler(builder.Engine.backend, builder.Engine.chain, builder.finalizedHeaderCache, builder.me, access.WithBlockSignerDecoder(builder.signerIndicesDecoder)) } } - accessproto.RegisterAccessAPIServer(builder.unsecureGrpcServer.Server, handler) - accessproto.RegisterAccessAPIServer(builder.secureGrpcServer.Server, handler) + accessproto.RegisterAccessAPIServer(builder.unsecureGrpcServer.Server, rpcHandler) + accessproto.RegisterAccessAPIServer(builder.secureGrpcServer.Server, rpcHandler) return builder.Engine, nil } diff --git a/engine/access/rpc/rate_limit_test.go b/engine/access/rpc/rate_limit_test.go index 3fce4aeded0..2d210fc358a 100644 --- a/engine/access/rpc/rate_limit_test.go +++ b/engine/access/rpc/rate_limit_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -21,6 +20,7 @@ import ( "google.golang.org/grpc/status" accessmock "github.com/onflow/flow-go/engine/access/mock" + "github.com/onflow/flow-go/engine/access/rpc/backend" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/irrecoverable" @@ -31,6 +31,8 @@ import ( storagemock "github.com/onflow/flow-go/storage/mock" "github.com/onflow/flow-go/utils/grpcutils" "github.com/onflow/flow-go/utils/unittest" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" ) type RateLimitTestSuite struct { @@ -146,8 +148,40 @@ func (suite *RateLimitTestSuite) SetupTest() { block := unittest.BlockHeaderFixture() suite.snapshot.On("Head").Return(block, nil) - rpcEngBuilder, err := NewBuilder(suite.log, suite.state, config, suite.collClient, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, nil, - nil, suite.chainID, suite.metrics, 0, 0, false, false, suite.me, suite.secureGrpcServer, suite.unsecureGrpcServer) + backend := backend.New( + suite.state, + suite.collClient, + nil, + suite.blocks, + suite.headers, + suite.collections, + suite.transactions, + nil, + nil, + suite.chainID, + suite.metrics, + nil, + false, + 0, + nil, + nil, + suite.log, + 0, + nil) + + rpcEngBuilder, err := NewBuilder( + suite.log, + suite.state, + config, + suite.chainID, + suite.metrics, + false, + suite.me, + backend, + backend, + suite.secureGrpcServer, + suite.unsecureGrpcServer) + require.NoError(suite.T(), err) suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build() require.NoError(suite.T(), err) diff --git a/engine/access/secure_grpcr_test.go b/engine/access/secure_grpcr_test.go index 82fbfa0bd5a..ca403ef6391 100644 --- a/engine/access/secure_grpcr_test.go +++ b/engine/access/secure_grpcr_test.go @@ -20,6 +20,7 @@ import ( "github.com/onflow/flow-go/crypto" accessmock "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/engine/access/rpc" + "github.com/onflow/flow-go/engine/access/rpc/backend" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/irrecoverable" @@ -130,10 +131,8 @@ func (suite *SecureGRPCTestSuite) SetupTest() { block := unittest.BlockHeaderFixture() suite.snapshot.On("Head").Return(block, nil) - rpcEngBuilder, err := rpc.NewBuilder( - suite.log, + backend := backend.New( suite.state, - config, suite.collClient, nil, suite.blocks, @@ -144,11 +143,25 @@ func (suite *SecureGRPCTestSuite) SetupTest() { nil, suite.chainID, suite.metrics, + nil, + false, 0, + nil, + nil, + suite.log, 0, - false, + nil) + + rpcEngBuilder, err := rpc.NewBuilder( + suite.log, + suite.state, + config, + suite.chainID, + suite.metrics, false, suite.me, + backend, + backend, suite.secureGrpcServer, suite.unsecureGrpcServer, ) diff --git a/engine/common/grpc/forwarder/forwarder.go b/engine/common/grpc/forwarder/forwarder.go new file mode 100644 index 00000000000..b5cf6244d44 --- /dev/null +++ b/engine/common/grpc/forwarder/forwarder.go @@ -0,0 +1,145 @@ +package forwarder + +import ( + "fmt" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" + + "github.com/onflow/flow-go/engine/access/rpc/backend" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/grpcutils" + + "github.com/onflow/flow/protobuf/go/flow/access" +) + +// Forwarder forwards all requests to a set of upstream access nodes or observers +type Forwarder struct { + lock sync.Mutex + roundRobin int + ids flow.IdentityList + upstream []access.AccessAPIClient + connections []*grpc.ClientConn + timeout time.Duration + maxMsgSize uint +} + +func NewForwarder(identities flow.IdentityList, timeout time.Duration, maxMsgSize uint) (*Forwarder, error) { + forwarder := &Forwarder{maxMsgSize: maxMsgSize} + err := forwarder.setFlowAccessAPI(identities, timeout) + return forwarder, err +} + +// setFlowAccessAPI sets a backend access API that forwards some requests to an upstream node. +// It is used by Observer services, Blockchain Data Service, etc. +// Make sure that this is just for observation and not a staked participant in the flow network. +// This means that observers see a copy of the data but there is no interaction to ensure integrity from the root block. +func (f *Forwarder) setFlowAccessAPI(accessNodeAddressAndPort flow.IdentityList, timeout time.Duration) error { + f.timeout = timeout + f.ids = accessNodeAddressAndPort + f.upstream = make([]access.AccessAPIClient, accessNodeAddressAndPort.Count()) + f.connections = make([]*grpc.ClientConn, accessNodeAddressAndPort.Count()) + for i, identity := range accessNodeAddressAndPort { + // Store the faultTolerantClient setup parameters such as address, public, key and timeout, so that + // we can refresh the API on connection loss + f.ids[i] = identity + + // We fail on any single error on startup, so that + // we identify bootstrapping errors early + err := f.reconnectingClient(i) + if err != nil { + return err + } + } + + f.roundRobin = 0 + return nil +} + +// reconnectingClient returns an active client, or +// creates one, if the last one is not ready anymore. +func (f *Forwarder) reconnectingClient(i int) error { + timeout := f.timeout + + if f.connections[i] == nil || f.connections[i].GetState() != connectivity.Ready { + identity := f.ids[i] + var connection *grpc.ClientConn + var err error + if identity.NetworkPubKey == nil { + connection, err = grpc.Dial( + identity.Address, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(f.maxMsgSize))), + grpc.WithTransportCredentials(insecure.NewCredentials()), + backend.WithClientUnaryInterceptor(timeout)) + if err != nil { + return err + } + } else { + tlsConfig, err := grpcutils.DefaultClientTLSConfig(identity.NetworkPubKey) + if err != nil { + return fmt.Errorf("failed to get default TLS client config using public flow networking key %s %w", identity.NetworkPubKey.String(), err) + } + + connection, err = grpc.Dial( + identity.Address, + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(f.maxMsgSize))), + grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), + backend.WithClientUnaryInterceptor(timeout)) + if err != nil { + return fmt.Errorf("cannot connect to %s %w", identity.Address, err) + } + } + connection.Connect() + time.Sleep(1 * time.Second) + state := connection.GetState() + if state != connectivity.Ready && state != connectivity.Connecting { + return fmt.Errorf("%v", state) + } + f.connections[i] = connection + f.upstream[i] = access.NewAccessAPIClient(connection) + } + + return nil +} + +// FaultTolerantClient implements an upstream connection that reconnects on errors +// a reasonable amount of time. +func (f *Forwarder) FaultTolerantClient() (access.AccessAPIClient, error) { + if f.upstream == nil || len(f.upstream) == 0 { + return nil, status.Errorf(codes.Unimplemented, "method not implemented") + } + + // Reasoning: A retry count of three gives an acceptable 5% failure ratio from a 37% failure ratio. + // A bigger number is problematic due to the DNS resolve and connection times, + // plus the need to log and debug each individual connection failure. + // + // This reasoning eliminates the need of making this parameter configurable. + // The logic works rolling over a single connection as well making clean code. + const retryMax = 3 + + f.lock.Lock() + defer f.lock.Unlock() + + var err error + for i := 0; i < retryMax; i++ { + f.roundRobin++ + f.roundRobin = f.roundRobin % len(f.upstream) + err = f.reconnectingClient(f.roundRobin) + if err != nil { + continue + } + state := f.connections[f.roundRobin].GetState() + if state != connectivity.Ready && state != connectivity.Connecting { + continue + } + return f.upstream[f.roundRobin], nil + } + + return nil, status.Errorf(codes.Unavailable, err.Error()) +} diff --git a/engine/common/rpc/convert/blocks.go b/engine/common/rpc/convert/blocks.go index 3c42fffb4c0..2e7f5689515 100644 --- a/engine/common/rpc/convert/blocks.go +++ b/engine/common/rpc/convert/blocks.go @@ -3,10 +3,11 @@ package convert import ( "fmt" - "github.com/onflow/flow/protobuf/go/flow/entities" "google.golang.org/protobuf/types/known/timestamppb" "github.com/onflow/flow-go/model/flow" + + "github.com/onflow/flow/protobuf/go/flow/entities" ) // BlockToMessage converts a flow.Block to a protobuf Block message. diff --git a/engine/common/rpc/convert/collections.go b/engine/common/rpc/convert/collections.go index 00f3f477ccb..69725636449 100644 --- a/engine/common/rpc/convert/collections.go +++ b/engine/common/rpc/convert/collections.go @@ -44,6 +44,18 @@ func LightCollectionToMessage(c *flow.LightCollection) (*entities.Collection, er }, nil } +// MessageToLightCollection converts a protobuf message to a light collection +func MessageToLightCollection(m *entities.Collection) (*flow.LightCollection, error) { + transactions := make([]flow.Identifier, 0, len(m.TransactionIds)) + for _, txId := range m.TransactionIds { + transactions = append(transactions, MessageToIdentifier(txId)) + } + + return &flow.LightCollection{ + Transactions: transactions, + }, nil +} + // CollectionGuaranteeToMessage converts a collection guarantee to a protobuf message func CollectionGuaranteeToMessage(g *flow.CollectionGuarantee) *entities.CollectionGuarantee { id := g.ID() diff --git a/engine/common/rpc/convert/collections_test.go b/engine/common/rpc/convert/collections_test.go index 75ab6f25adc..2e14a6dc225 100644 --- a/engine/common/rpc/convert/collections_test.go +++ b/engine/common/rpc/convert/collections_test.go @@ -9,6 +9,8 @@ import ( "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/utils/unittest" + + "github.com/onflow/flow/protobuf/go/flow/entities" ) // TestConvertCollection tests that converting a collection to a protobuf message results in the correct @@ -32,10 +34,12 @@ func TestConvertCollection(t *testing.T) { } }) - t.Run("convert light collection to message", func(t *testing.T) { - lightCollection := flow.LightCollection{Transactions: txIDs} + var msg *entities.Collection + lightCollection := flow.LightCollection{Transactions: txIDs} - msg, err := convert.LightCollectionToMessage(&lightCollection) + t.Run("convert light collection to message", func(t *testing.T) { + var err error + msg, err = convert.LightCollectionToMessage(&lightCollection) require.NoError(t, err) assert.Len(t, msg.TransactionIds, len(txIDs)) @@ -43,6 +47,16 @@ func TestConvertCollection(t *testing.T) { assert.Equal(t, txID[:], msg.TransactionIds[i]) } }) + + t.Run("convert message to light collection", func(t *testing.T) { + lightColl, err := convert.MessageToLightCollection(msg) + require.NoError(t, err) + + assert.Equal(t, len(txIDs), len(lightColl.Transactions)) + for _, txID := range lightColl.Transactions { + assert.Equal(t, txID[:], txID[:]) + } + }) } // TestConvertCollectionGuarantee tests that converting a collection guarantee to and from a protobuf diff --git a/engine/common/rpc/convert/events.go b/engine/common/rpc/convert/events.go index d3bd469cd48..58ccb0ed9a1 100644 --- a/engine/common/rpc/convert/events.go +++ b/engine/common/rpc/convert/events.go @@ -4,12 +4,16 @@ import ( "encoding/json" "fmt" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/onflow/cadence/encoding/ccf" jsoncdc "github.com/onflow/cadence/encoding/json" - "github.com/onflow/flow/protobuf/go/flow/entities" - execproto "github.com/onflow/flow/protobuf/go/flow/execution" "github.com/onflow/flow-go/model/flow" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/onflow/flow/protobuf/go/flow/entities" + execproto "github.com/onflow/flow/protobuf/go/flow/execution" ) // EventToMessage converts a flow.Event to a protobuf message @@ -172,3 +176,51 @@ func CcfEventToJsonEvent(e flow.Event) (*flow.Event, error) { Payload: convertedPayload, }, nil } + +// MessagesToBlockEvents converts a protobuf EventsResponse_Result messages to a slice of flow.BlockEvents. +func MessagesToBlockEvents(blocksEvents []*accessproto.EventsResponse_Result) []flow.BlockEvents { + evs := make([]flow.BlockEvents, len(blocksEvents)) + for i, ev := range blocksEvents { + evs[i] = MessageToBlockEvents(ev) + } + + return evs +} + +// MessageToBlockEvents converts a protobuf EventsResponse_Result message to a flow.BlockEvents. +func MessageToBlockEvents(blockEvents *accessproto.EventsResponse_Result) flow.BlockEvents { + return flow.BlockEvents{ + BlockHeight: blockEvents.BlockHeight, + BlockID: MessageToIdentifier(blockEvents.BlockId), + BlockTimestamp: blockEvents.BlockTimestamp.AsTime(), + Events: MessagesToEvents(blockEvents.Events), + } +} + +func BlockEventsToMessages(blocks []flow.BlockEvents) ([]*accessproto.EventsResponse_Result, error) { + results := make([]*accessproto.EventsResponse_Result, len(blocks)) + + for i, block := range blocks { + event, err := BlockEventsToMessage(block) + if err != nil { + return nil, err + } + results[i] = event + } + + return results, nil +} + +func BlockEventsToMessage(block flow.BlockEvents) (*accessproto.EventsResponse_Result, error) { + eventMessages := make([]*entities.Event, len(block.Events)) + for i, event := range block.Events { + eventMessages[i] = EventToMessage(event) + } + timestamp := timestamppb.New(block.BlockTimestamp) + return &accessproto.EventsResponse_Result{ + BlockId: block.BlockID[:], + BlockHeight: block.BlockHeight, + BlockTimestamp: timestamp, + Events: eventMessages, + }, nil +} diff --git a/engine/common/rpc/convert/events_test.go b/engine/common/rpc/convert/events_test.go index 2cf010fa011..879db710f8b 100644 --- a/engine/common/rpc/convert/events_test.go +++ b/engine/common/rpc/convert/events_test.go @@ -193,3 +193,24 @@ func TestConvertServiceEventList(t *testing.T) { assert.Equal(t, serviceEvents, converted) } + +// TestConvertMessagesToBlockEvents tests that converting a protobuf EventsResponse_Result message to and from block events in the same +// block +func TestConvertMessagesToBlockEvents(t *testing.T) { + t.Parallel() + + count := 2 + blockEvents := make([]flow.BlockEvents, count) + for i := 0; i < count; i++ { + header := unittest.BlockHeaderFixture(unittest.WithHeaderHeight(uint64(i))) + blockEvents[i] = unittest.BlockEventsFixture(header, 2) + } + + msg, err := convert.BlockEventsToMessages(blockEvents) + require.NoError(t, err) + + converted := convert.MessagesToBlockEvents(msg) + require.NoError(t, err) + + assert.Equal(t, blockEvents, converted) +} diff --git a/go.mod b/go.mod index 4d4874faa2b..b4b7cbe7f62 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/onflow/flow-core-contracts/lib/go/templates v1.2.3 github.com/onflow/flow-go-sdk v0.41.9 github.com/onflow/flow-go/crypto v0.24.9 - github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391 + github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 github.com/pierrec/lz4 v2.6.1+incompatible diff --git a/go.sum b/go.sum index 672d246ad54..97ca856b74d 100644 --- a/go.sum +++ b/go.sum @@ -1259,8 +1259,8 @@ github.com/onflow/flow-go/crypto v0.24.9/go.mod h1:fqCzkIBBMRRkciVrvW21rECKq1oD7 github.com/onflow/flow-nft/lib/go/contracts v1.1.0 h1:rhUDeD27jhLwOqQKI/23008CYfnqXErrJvc4EFRP2a0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0/go.mod h1:YsvzYng4htDgRB9sa9jxdwoTuuhjK8WYWXTyLkIigZY= github.com/onflow/flow/protobuf/go/flow v0.2.2/go.mod h1:gQxYqCfkI8lpnKsmIjwtN2mV/N2PIwc1I+RUK4HPIc8= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391 h1:6uKg0gpLKpTZKMihrsFR0Gkq++1hykzfR1tQCKuOfw4= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce h1:YQKijiQaq8SF1ayNqp3VVcwbBGXSnuHNHq4GQmVGybE= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d h1:QcOAeEyF3iAUHv21LQ12sdcsr0yFrJGoGLyCAzYYtvI= github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d/go.mod h1:GCPpiyRoHncdqPj++zPr9ZOYBX4hpJ0pYZRYqSE8VKk= github.com/onflow/sdks v0.5.0 h1:2HCRibwqDaQ1c9oUApnkZtEAhWiNY2GTpRD5+ftdkN8= diff --git a/insecure/go.mod b/insecure/go.mod index 02cbccb4c28..9a64f440592 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -186,7 +186,7 @@ require ( github.com/onflow/flow-ft/lib/go/contracts v0.7.0 // indirect github.com/onflow/flow-go-sdk v0.41.9 // indirect github.com/onflow/flow-nft/lib/go/contracts v1.1.0 // indirect - github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391 // indirect + github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce // indirect github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d // indirect github.com/onflow/sdks v0.5.0 // indirect github.com/onflow/wal v0.0.0-20230529184820-bc9f8244608d // indirect diff --git a/insecure/go.sum b/insecure/go.sum index e4153a1db4f..25f044da73e 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -1228,13 +1228,13 @@ github.com/onflow/flow-go-sdk v0.24.0/go.mod h1:IoptMLPyFXWvyd9yYA6/4EmSeeozl6nJ github.com/onflow/flow-go-sdk v0.41.9 h1:cyplhhhc0RnfOAan2t7I/7C9g1hVGDDLUhWj6ZHAkk4= github.com/onflow/flow-go-sdk v0.41.9/go.mod h1:e9Q5TITCy7g08lkdQJxP8fAKBnBoC5FjALvUKr36j4I= github.com/onflow/flow-go/crypto v0.21.3/go.mod h1:vI6V4CY3R6c4JKBxdcRiR/AnjBfL8OSD97bJc60cLuQ= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce h1:YQKijiQaq8SF1ayNqp3VVcwbBGXSnuHNHq4GQmVGybE= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/flow-go/crypto v0.24.9 h1:0EQp+kSZYJepMIiSypfJVe7tzsPcb6UXOdOtsTCDhBs= github.com/onflow/flow-go/crypto v0.24.9/go.mod h1:fqCzkIBBMRRkciVrvW21rECKq1oD7Q6u+bCI78lfNX0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0 h1:rhUDeD27jhLwOqQKI/23008CYfnqXErrJvc4EFRP2a0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0/go.mod h1:YsvzYng4htDgRB9sa9jxdwoTuuhjK8WYWXTyLkIigZY= github.com/onflow/flow/protobuf/go/flow v0.2.2/go.mod h1:gQxYqCfkI8lpnKsmIjwtN2mV/N2PIwc1I+RUK4HPIc8= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391 h1:6uKg0gpLKpTZKMihrsFR0Gkq++1hykzfR1tQCKuOfw4= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d h1:QcOAeEyF3iAUHv21LQ12sdcsr0yFrJGoGLyCAzYYtvI= github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d/go.mod h1:GCPpiyRoHncdqPj++zPr9ZOYBX4hpJ0pYZRYqSE8VKk= github.com/onflow/sdks v0.5.0 h1:2HCRibwqDaQ1c9oUApnkZtEAhWiNY2GTpRD5+ftdkN8= diff --git a/integration/go.mod b/integration/go.mod index f42b3a76f1c..e47e8bac3c3 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -25,7 +25,7 @@ require ( github.com/onflow/flow-go-sdk v0.41.9 github.com/onflow/flow-go/crypto v0.24.9 github.com/onflow/flow-go/insecure v0.0.0-00010101000000-000000000000 - github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391 + github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce github.com/plus3it/gorecurcopy v0.0.1 github.com/prometheus/client_golang v1.14.0 github.com/prometheus/client_model v0.4.0 diff --git a/integration/go.sum b/integration/go.sum index 9649fd57d60..4aac8d7305d 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -1373,8 +1373,8 @@ github.com/onflow/flow-go/crypto v0.24.9/go.mod h1:fqCzkIBBMRRkciVrvW21rECKq1oD7 github.com/onflow/flow-nft/lib/go/contracts v1.1.0 h1:rhUDeD27jhLwOqQKI/23008CYfnqXErrJvc4EFRP2a0= github.com/onflow/flow-nft/lib/go/contracts v1.1.0/go.mod h1:YsvzYng4htDgRB9sa9jxdwoTuuhjK8WYWXTyLkIigZY= github.com/onflow/flow/protobuf/go/flow v0.2.2/go.mod h1:gQxYqCfkI8lpnKsmIjwtN2mV/N2PIwc1I+RUK4HPIc8= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391 h1:6uKg0gpLKpTZKMihrsFR0Gkq++1hykzfR1tQCKuOfw4= -github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230602212908-08fc6536d391/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce h1:YQKijiQaq8SF1ayNqp3VVcwbBGXSnuHNHq4GQmVGybE= +github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20230628215638-83439d22e0ce/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d h1:QcOAeEyF3iAUHv21LQ12sdcsr0yFrJGoGLyCAzYYtvI= github.com/onflow/go-bitswap v0.0.0-20230703214630-6d3db958c73d/go.mod h1:GCPpiyRoHncdqPj++zPr9ZOYBX4hpJ0pYZRYqSE8VKk= github.com/onflow/nft-storefront/lib/go/contracts v0.0.0-20221222181731-14b90207cead h1:2j1Unqs76Z1b95Gu4C3Y28hzNUHBix7wL490e61SMSw= diff --git a/integration/localnet/builder/bootstrap.go b/integration/localnet/builder/bootstrap.go index 0bd709b9203..dc54dba8cdb 100644 --- a/integration/localnet/builder/bootstrap.go +++ b/integration/localnet/builder/bootstrap.go @@ -453,12 +453,14 @@ func prepareObserverService(i int, observerName string, agPublicKey string) Serv fmt.Sprintf("--rpc-addr=%s:%s", observerName, testnet.GRPCPort), fmt.Sprintf("--secure-rpc-addr=%s:%s", observerName, testnet.GRPCSecurePort), fmt.Sprintf("--http-addr=%s:%s", observerName, testnet.GRPCWebPort), + fmt.Sprintf("--rest-addr=%s:%s", observerName, testnet.RESTPort), ) service.AddExposedPorts( testnet.GRPCPort, testnet.GRPCSecurePort, testnet.GRPCWebPort, + testnet.RESTPort, ) // observer services rely on the access gateway diff --git a/integration/testnet/network.go b/integration/testnet/network.go index 9f060dd0532..8b9522d7ba6 100644 --- a/integration/testnet/network.go +++ b/integration/testnet/network.go @@ -768,6 +768,9 @@ func (net *FlowNetwork) addObserver(t *testing.T, conf ObserverConfig) { nodeContainer.exposePort(AdminPort, testingdock.RandomPort(t)) nodeContainer.AddFlag("admin-addr", nodeContainer.ContainerAddr(AdminPort)) + nodeContainer.exposePort(RESTPort, testingdock.RandomPort(t)) + nodeContainer.AddFlag("rest-addr", nodeContainer.ContainerAddr(RESTPort)) + nodeContainer.opts.HealthCheck = testingdock.HealthCheckCustom(nodeContainer.HealthcheckCallback()) suiteContainer := net.suite.Container(containerOpts) diff --git a/integration/tests/access/observer_test.go b/integration/tests/access/observer_test.go index 29b96da49e6..25bfeab2f3a 100644 --- a/integration/tests/access/observer_test.go +++ b/integration/tests/access/observer_test.go @@ -1,22 +1,29 @@ package access import ( + "bytes" "context" + "encoding/json" + "fmt" + "net/http" "testing" - "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" - accessproto "github.com/onflow/flow/protobuf/go/flow/access" + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "github.com/onflow/flow-go/engine/access/rest/util" "github.com/onflow/flow-go/integration/testnet" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/utils/unittest" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" ) func TestObserver(t *testing.T) { @@ -25,9 +32,10 @@ func TestObserver(t *testing.T) { type ObserverSuite struct { suite.Suite - net *testnet.FlowNetwork - teardown func() - local map[string]struct{} + net *testnet.FlowNetwork + teardown func() + localRpc map[string]struct{} + localRest map[string]struct{} cancel context.CancelFunc } @@ -44,7 +52,7 @@ func (s *ObserverSuite) TearDownTest() { } func (s *ObserverSuite) SetupTest() { - s.local = map[string]struct{}{ + s.localRpc = map[string]struct{}{ "Ping": {}, "GetLatestBlockHeader": {}, "GetBlockHeaderByID": {}, @@ -56,18 +64,26 @@ func (s *ObserverSuite) SetupTest() { "GetNetworkParameters": {}, } + s.localRest = map[string]struct{}{ + "getBlocksByIDs": {}, + "getBlocksByHeight": {}, + "getBlockPayloadByID": {}, + "getNetworkParameters": {}, + "getNodeVersionInfo": {}, + } + nodeConfigs := []testnet.NodeConfig{ // access node with unstaked nodes supported testnet.NewNodeConfig(flow.RoleAccess, testnet.WithLogLevel(zerolog.InfoLevel), testnet.WithAdditionalFlag("--supports-observer=true")), - // need one dummy execution node (unused ghost) - testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.FatalLevel), testnet.AsGhost()), + // need one dummy execution node + testnet.NewNodeConfig(flow.RoleExecution, testnet.WithLogLevel(zerolog.FatalLevel)), // need one dummy verification node (unused ghost) testnet.NewNodeConfig(flow.RoleVerification, testnet.WithLogLevel(zerolog.FatalLevel), testnet.AsGhost()), - // need one controllable collection node (unused ghost) - testnet.NewNodeConfig(flow.RoleCollection, testnet.WithLogLevel(zerolog.FatalLevel), testnet.AsGhost()), + // need one controllable collection node + testnet.NewNodeConfig(flow.RoleCollection, testnet.WithLogLevel(zerolog.FatalLevel)), // need three consensus nodes (unused ghost) testnet.NewNodeConfig(flow.RoleConsensus, testnet.WithLogLevel(zerolog.FatalLevel), testnet.AsGhost()), @@ -90,11 +106,11 @@ func (s *ObserverSuite) SetupTest() { s.net.Start(ctx) } -// TestObserver runs the following tests: +// TestObserverRPC runs the following tests: // 1. CompareRPCs: verifies that the observer client returns the same errors as the access client for rpcs proxied to the upstream AN // 2. HandledByUpstream: stops the upstream AN and verifies that the observer client returns errors for all rpcs handled by the upstream // 3. HandledByObserver: stops the upstream AN and verifies that the observer client handles all other queries -func (s *ObserverSuite) TestObserver() { +func (s *ObserverSuite) TestObserverRPC() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -111,7 +127,7 @@ func (s *ObserverSuite) TestObserver() { // verify that both clients return the same errors for proxied rpcs for _, rpc := range s.getRPCs() { // skip rpcs handled locally by observer - if _, local := s.local[rpc.name]; local { + if _, local := s.localRpc[rpc.name]; local { continue } t.Run(rpc.name, func(t *testing.T) { @@ -129,7 +145,7 @@ func (s *ObserverSuite) TestObserver() { t.Run("HandledByUpstream", func(t *testing.T) { // verify that we receive Unavailable errors from all rpcs handled upstream for _, rpc := range s.getRPCs() { - if _, local := s.local[rpc.name]; local { + if _, local := s.localRpc[rpc.name]; local { continue } t.Run(rpc.name, func(t *testing.T) { @@ -142,7 +158,7 @@ func (s *ObserverSuite) TestObserver() { t.Run("HandledByObserver", func(t *testing.T) { // verify that we receive NotFound or no error from all rpcs handled locally for _, rpc := range s.getRPCs() { - if _, local := s.local[rpc.name]; !local { + if _, local := s.localRpc[rpc.name]; !local { continue } t.Run(rpc.name, func(t *testing.T) { @@ -154,7 +170,90 @@ func (s *ObserverSuite) TestObserver() { }) } }) +} + +// TestObserverRest runs the following tests: +// 1. CompareEndpoints: verifies that the observer client returns the same errors as the access client for rests proxied to the upstream AN +// 2. HandledByUpstream: stops the upstream AN and verifies that the observer client returns errors for all rests handled by the upstream +// 3. HandledByObserver: stops the upstream AN and verifies that the observer client handles all other queries +func (s *ObserverSuite) TestObserverRest() { + t := s.T() + + accessAddr := s.net.ContainerByName(testnet.PrimaryAN).Addr(testnet.RESTPort) + observerAddr := s.net.ContainerByName("observer_1").Addr(testnet.RESTPort) + + httpClient := http.DefaultClient + makeHttpCall := func(method string, url string, body interface{}) (*http.Response, error) { + switch method { + case http.MethodGet: + return httpClient.Get(url) + case http.MethodPost: + jsonBody, _ := json.Marshal(body) + return httpClient.Post(url, "application/json", bytes.NewBuffer(jsonBody)) + } + panic("not supported") + } + makeObserverCall := func(method string, path string, body interface{}) (*http.Response, error) { + return makeHttpCall(method, "http://"+observerAddr+"/v1"+path, body) + } + makeAccessCall := func(method string, path string, body interface{}) (*http.Response, error) { + return makeHttpCall(method, "http://"+accessAddr+"/v1"+path, body) + } + + t.Run("CompareEndpoints", func(t *testing.T) { + // verify that both clients return the same errors for proxied rests + for _, endpoint := range s.getRestEndpoints() { + // skip rest handled locally by observer + if _, local := s.localRest[endpoint.name]; local { + continue + } + t.Run(endpoint.name, func(t *testing.T) { + accessResp, accessErr := makeAccessCall(endpoint.method, endpoint.path, endpoint.body) + observerResp, observerErr := makeObserverCall(endpoint.method, endpoint.path, endpoint.body) + assert.NoError(t, accessErr) + assert.NoError(t, observerErr) + assert.Equal(t, accessResp.Status, observerResp.Status) + assert.Equal(t, accessResp.StatusCode, observerResp.StatusCode) + assert.Contains(t, [...]int{ + http.StatusNotFound, + http.StatusOK, + }, observerResp.StatusCode) + }) + } + }) + + // stop the upstream access container + err := s.net.StopContainerByName(context.Background(), testnet.PrimaryAN) + require.NoError(t, err) + + t.Run("HandledByUpstream", func(t *testing.T) { + // verify that we receive StatusInternalServerError, StatusServiceUnavailable errors from all rests handled upstream + for _, endpoint := range s.getRestEndpoints() { + if _, local := s.localRest[endpoint.name]; local { + continue + } + t.Run(endpoint.name, func(t *testing.T) { + observerResp, observerErr := makeObserverCall(endpoint.method, endpoint.path, endpoint.body) + require.NoError(t, observerErr) + assert.Contains(t, [...]int{ + http.StatusServiceUnavailable}, observerResp.StatusCode) + }) + } + }) + t.Run("HandledByObserver", func(t *testing.T) { + // verify that we receive NotFound or no error from all rests handled locally + for _, endpoint := range s.getRestEndpoints() { + if _, local := s.localRest[endpoint.name]; !local { + continue + } + t.Run(endpoint.name, func(t *testing.T) { + observerResp, observerErr := makeObserverCall(endpoint.method, endpoint.path, endpoint.body) + require.NoError(t, observerErr) + assert.Contains(t, [...]int{http.StatusNotFound, http.StatusOK}, observerResp.StatusCode) + }) + } + }) } func (s *ObserverSuite) getAccessClient() (accessproto.AccessAPIClient, error) { @@ -287,3 +386,126 @@ func (s *ObserverSuite) getRPCs() []RPCTest { }}, } } + +type RestEndpointTest struct { + name string + method string + path string + body interface{} +} + +func (s *ObserverSuite) getRestEndpoints() []RestEndpointTest { + transactionId := unittest.IdentifierFixture().String() + account := flow.Localnet.Chain().ServiceAddress().String() + block := unittest.BlockFixture() + executionResult := unittest.ExecutionResultFixture() + collection := unittest.CollectionFixture(2) + eventType := "A.0123456789abcdef.flow.event" + + return []RestEndpointTest{ + { + name: "getTransactionByID", + method: http.MethodGet, + path: "/transactions/" + transactionId, + }, + { + name: "createTransaction", + method: http.MethodPost, + path: "/transactions", + body: createTx(s.net), + }, + { + name: "getTransactionResultByID", + method: http.MethodGet, + path: fmt.Sprintf("/transaction_results/%s?block_id=%s&collection_id=%s", transactionId, block.ID().String(), collection.ID().String()), + }, + { + name: "getBlocksByIDs", + method: http.MethodGet, + path: "/blocks/" + block.ID().String(), + }, + { + name: "getBlocksByHeight", + method: http.MethodGet, + path: "/blocks?height=1", + }, + { + name: "getBlockPayloadByID", + method: http.MethodGet, + path: "/blocks/" + block.ID().String() + "/payload", + }, + { + name: "getExecutionResultByID", + method: http.MethodGet, + path: "/execution_results/" + executionResult.ID().String(), + }, + { + name: "getExecutionResultByBlockID", + method: http.MethodGet, + path: "/execution_results?block_id=" + block.ID().String(), + }, + { + name: "getCollectionByID", + method: http.MethodGet, + path: "/collections/" + collection.ID().String(), + }, + { + name: "executeScript", + method: http.MethodPost, + path: "/scripts", + body: createScript(), + }, + { + name: "getAccount", + method: http.MethodGet, + path: "/accounts/" + account + "?block_height=1", + }, + { + name: "getEvents", + method: http.MethodGet, + path: fmt.Sprintf("/events?type=%s&start_height=%d&end_height=%d", eventType, 0, 3), + }, + { + name: "getNetworkParameters", + method: http.MethodGet, + path: "/network/parameters", + }, + { + name: "getNodeVersionInfo", + method: http.MethodGet, + path: "/node_version_info", + }, + } +} + +func createTx(net *testnet.FlowNetwork) interface{} { + flowAddr := flow.Localnet.Chain().ServiceAddress() + payloadSignature := unittest.TransactionSignatureFixture() + envelopeSignature := unittest.TransactionSignatureFixture() + + payloadSignature.Address = flowAddr + + envelopeSignature.Address = flowAddr + envelopeSignature.KeyIndex = 2 + + tx := flow.NewTransactionBody(). + AddAuthorizer(flowAddr). + SetPayer(flowAddr). + SetScript(unittest.NoopTxScript()). + SetReferenceBlockID(net.Root().ID()). + SetProposalKey(flowAddr, 1, 0) + tx.PayloadSignatures = []flow.TransactionSignature{payloadSignature} + tx.EnvelopeSignatures = []flow.TransactionSignature{envelopeSignature} + + return unittest.CreateSendTxHttpPayload(*tx) +} + +func createScript() interface{} { + validCode := []byte(`pub fun main(foo: String): String { return foo }`) + validArgs := []byte(`{ "type": "String", "value": "hello world" }`) + body := map[string]interface{}{ + "script": util.ToBase64(validCode), + "arguments": []string{util.ToBase64(validArgs)}, + } + return body +} diff --git a/module/metrics/noop.go b/module/metrics/noop.go index 226af7f1f26..9bf5be48f0d 100644 --- a/module/metrics/noop.go +++ b/module/metrics/noop.go @@ -4,6 +4,8 @@ import ( "context" "time" + "google.golang.org/grpc/codes" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" @@ -304,3 +306,7 @@ func (nc *NoopCollector) AsyncProcessingFinished(string, time.Duration) func (nc *NoopCollector) OnMisbehaviorReported(string, string) {} func (nc *NoopCollector) OnViolationReportSkipped() {} + +var _ ObserverMetrics = (*NoopCollector)(nil) + +func (nc *NoopCollector) RecordRPC(handler, rpc string, code codes.Code) {} diff --git a/module/metrics/observer.go b/module/metrics/observer.go index 4e885c9bf4c..375aa66a2ac 100644 --- a/module/metrics/observer.go +++ b/module/metrics/observer.go @@ -6,10 +6,16 @@ import ( "google.golang.org/grpc/codes" ) +type ObserverMetrics interface { + RecordRPC(handler, rpc string, code codes.Code) +} + type ObserverCollector struct { rpcs *prometheus.CounterVec } +var _ ObserverMetrics = (*ObserverCollector)(nil) + func NewObserverCollector() *ObserverCollector { return &ObserverCollector{ rpcs: promauto.NewCounterVec(prometheus.CounterOpts{ diff --git a/utils/unittest/fixtures.go b/utils/unittest/fixtures.go index 2d090c38455..999f090232b 100644 --- a/utils/unittest/fixtures.go +++ b/utils/unittest/fixtures.go @@ -20,6 +20,7 @@ import ( "github.com/onflow/flow-go/crypto" "github.com/onflow/flow-go/crypto/hash" "github.com/onflow/flow-go/engine" + "github.com/onflow/flow-go/engine/access/rest/util" "github.com/onflow/flow-go/fvm/storage/snapshot" "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/ledger/common/bitutils" @@ -2506,3 +2507,35 @@ func ChunkExecutionDataFixture(t *testing.T, minSize int, opts ...func(*executio size *= 2 } } + +func CreateSendTxHttpPayload(tx flow.TransactionBody) map[string]interface{} { + tx.Arguments = [][]uint8{} // fix how fixture creates nil values + auth := make([]string, len(tx.Authorizers)) + for i, a := range tx.Authorizers { + auth[i] = a.String() + } + + return map[string]interface{}{ + "script": util.ToBase64(tx.Script), + "arguments": tx.Arguments, + "reference_block_id": tx.ReferenceBlockID.String(), + "gas_limit": fmt.Sprintf("%d", tx.GasLimit), + "payer": tx.Payer.String(), + "proposal_key": map[string]interface{}{ + "address": tx.ProposalKey.Address.String(), + "key_index": fmt.Sprintf("%d", tx.ProposalKey.KeyIndex), + "sequence_number": fmt.Sprintf("%d", tx.ProposalKey.SequenceNumber), + }, + "authorizers": auth, + "payload_signatures": []map[string]interface{}{{ + "address": tx.PayloadSignatures[0].Address.String(), + "key_index": fmt.Sprintf("%d", tx.PayloadSignatures[0].KeyIndex), + "signature": util.ToBase64(tx.PayloadSignatures[0].Signature), + }}, + "envelope_signatures": []map[string]interface{}{{ + "address": tx.EnvelopeSignatures[0].Address.String(), + "key_index": fmt.Sprintf("%d", tx.EnvelopeSignatures[0].KeyIndex), + "signature": util.ToBase64(tx.EnvelopeSignatures[0].Signature), + }}, + } +}