Skip to content

Commit

Permalink
Merge #4499
Browse files Browse the repository at this point in the history
4499: [Observer] Enable REST API on observers r=peterargue a=UlyanaAndrukhiv

#3138

# Context
This pull request adds a new RestServerApi interface for REST service, ServerRequestHandler - structure that implements RestServerApi and represents local requests, RestForwarder- structure that also implements RestServerApi and handles the request forwarding to upstream. According to similar API proxy pattern adds RestRouter for observer node - structure that represents the routing proxy algorithm and splits requests between a local and forward requests which can't be handled locally to an upstream access node. 

As part of this PR:
* The protobuf have been extended (added GetExecutionResultByID request/response, see onflow/flow#1346).
* Tests for the RestRouter have been added, RestServerApi mock have been generated and added.
* Integration TestObserverRest test have been added.

Co-authored-by: UlyanaAndrukhiv <u.andrukhiv@gmail.com>
Co-authored-by: Uliana Andrukhiv <u.andrukhiv@gmail.com>
  • Loading branch information
bors[bot] and UlyanaAndrukhiv authored Jul 21, 2023
2 parents 3f3c15f + c7c05b9 commit bd3625e
Show file tree
Hide file tree
Showing 58 changed files with 1,499 additions and 528 deletions.
59 changes: 26 additions & 33 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package access
import (
"context"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/signature"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
)

type Handler struct {
Expand Down Expand Up @@ -516,7 +516,7 @@ func (h *Handler) GetEventsForHeightRange(
return nil, err
}

resultEvents, err := blockEventsToMessages(results)
resultEvents, err := convert.BlockEventsToMessages(results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -548,7 +548,7 @@ func (h *Handler) GetEventsForBlockIDs(
return nil, err
}

resultEvents, err := blockEventsToMessages(results)
resultEvents, err := convert.BlockEventsToMessages(results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -590,6 +590,27 @@ func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access.
return executionResultToMessages(result, metadata)
}

// GetExecutionResultByID returns the execution result for the given ID.
func (h *Handler) GetExecutionResultByID(ctx context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) {
metadata := h.buildMetadataResponse()

blockID := convert.MessageToIdentifier(req.GetId())

result, err := h.api.GetExecutionResultByID(ctx, blockID)
if err != nil {
return nil, err
}

execResult, err := convert.ExecutionResultToMessage(result)
if err != nil {
return nil, err
}
return &access.ExecutionResultByIDResponse{
ExecutionResult: execResult,
Metadata: metadata,
}, nil
}

func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) {
metadata := h.buildMetadataResponse()

Expand Down Expand Up @@ -659,34 +680,6 @@ func executionResultToMessages(er *flow.ExecutionResult, metadata *entities.Meta
}, nil
}

func blockEventsToMessages(blocks []flow.BlockEvents) ([]*access.EventsResponse_Result, error) {
results := make([]*access.EventsResponse_Result, len(blocks))

for i, block := range blocks {
event, err := blockEventsToMessage(block)
if err != nil {
return nil, err
}
results[i] = event
}

return results, nil
}

func blockEventsToMessage(block flow.BlockEvents) (*access.EventsResponse_Result, error) {
eventMessages := make([]*entities.Event, len(block.Events))
for i, event := range block.Events {
eventMessages[i] = convert.EventToMessage(event)
}
timestamp := timestamppb.New(block.BlockTimestamp)
return &access.EventsResponse_Result{
BlockId: block.BlockID[:],
BlockHeight: block.BlockHeight,
BlockTimestamp: timestamp,
Events: eventMessages,
}, nil
}

// WithBlockSignerDecoder configures the Handler to decode signer indices
// via the provided hotstuff.BlockSignerDecoder
func WithBlockSignerDecoder(signerIndicesDecoder hotstuff.BlockSignerDecoder) func(*Handler) {
Expand Down
95 changes: 66 additions & 29 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine/access/ingestion"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rest/routes"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
Expand Down Expand Up @@ -147,20 +147,22 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
collectionGRPCPort: 9000,
executionGRPCPort: 9000,
rpcConf: rpc.Config{
UnsecureGRPCListenAddr: "0.0.0.0:9000",
SecureGRPCListenAddr: "0.0.0.0:9001",
HTTPListenAddr: "0.0.0.0:8000",
RESTListenAddr: "",
CollectionAddr: "",
HistoricalAccessAddrs: "",
CollectionClientTimeout: 3 * time.Second,
ExecutionClientTimeout: 3 * time.Second,
ConnectionPoolSize: backend.DefaultConnectionPoolSize,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: nil,
FixedExecutionNodeIDs: nil,
ArchiveAddressList: nil,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
UnsecureGRPCListenAddr: "0.0.0.0:9000",
SecureGRPCListenAddr: "0.0.0.0:9001",
HTTPListenAddr: "0.0.0.0:8000",
RESTListenAddr: "",
CollectionAddr: "",
HistoricalAccessAddrs: "",
BackendConfig: backend.Config{
CollectionClientTimeout: 3 * time.Second,
ExecutionClientTimeout: 3 * time.Second,
ConnectionPoolSize: backend.DefaultConnectionPoolSize,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: nil,
FixedExecutionNodeIDs: nil,
ArchiveAddressList: nil,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
},
stateStreamConf: state_stream.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
Expand Down Expand Up @@ -667,15 +669,15 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.StringVar(&builder.rpcConf.RESTListenAddr, "rest-addr", defaultConfig.rpcConf.RESTListenAddr, "the address the REST server listens on (if empty the REST server will not be started)")
flags.StringVarP(&builder.rpcConf.CollectionAddr, "static-collection-ingress-addr", "", defaultConfig.rpcConf.CollectionAddr, "the address (of the collection node) to send transactions to")
flags.StringVarP(&builder.ExecutionNodeAddress, "script-addr", "s", defaultConfig.ExecutionNodeAddress, "the address (of the execution node) forward the script to")
flags.StringSliceVar(&builder.rpcConf.ArchiveAddressList, "archive-address-list", defaultConfig.rpcConf.ArchiveAddressList, "the list of address of the archive node to forward the script queries to")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.ArchiveAddressList, "archive-address-list", defaultConfig.rpcConf.BackendConfig.ArchiveAddressList, "the list of address of the archive node to forward the script queries to")
flags.StringVarP(&builder.rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", defaultConfig.rpcConf.HistoricalAccessAddrs, "comma separated rpc addresses for historical access nodes")
flags.DurationVar(&builder.rpcConf.CollectionClientTimeout, "collection-client-timeout", defaultConfig.rpcConf.CollectionClientTimeout, "grpc client timeout for a collection node")
flags.DurationVar(&builder.rpcConf.ExecutionClientTimeout, "execution-client-timeout", defaultConfig.rpcConf.ExecutionClientTimeout, "grpc client timeout for an execution node")
flags.UintVar(&builder.rpcConf.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.DurationVar(&builder.rpcConf.BackendConfig.CollectionClientTimeout, "collection-client-timeout", defaultConfig.rpcConf.BackendConfig.CollectionClientTimeout, "grpc client timeout for a collection node")
flags.DurationVar(&builder.rpcConf.BackendConfig.ExecutionClientTimeout, "execution-client-timeout", defaultConfig.rpcConf.BackendConfig.ExecutionClientTimeout, "grpc client timeout for an execution node")
flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", grpcutils.DefaultMaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc")
flags.UintVar(&builder.rpcConf.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.MaxHeightRange, "maximum size for height range requests")
flags.StringSliceVar(&builder.rpcConf.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.StringSliceVar(&builder.rpcConf.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.BackendConfig.MaxHeightRange, "maximum size for height range requests")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.BackendConfig.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.BackendConfig.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.BoolVar(&builder.logTxTimeToFinalized, "log-tx-time-to-finalized", defaultConfig.logTxTimeToFinalized, "log transaction time to finalized")
flags.BoolVar(&builder.logTxTimeToExecuted, "log-tx-time-to-executed", defaultConfig.logTxTimeToExecuted, "log transaction time to executed")
flags.BoolVar(&builder.logTxTimeToFinalizedExecuted, "log-tx-time-to-finalized-executed", defaultConfig.logTxTimeToFinalizedExecuted, "log transaction time to finalized and executed")
Expand Down Expand Up @@ -915,7 +917,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.rpcConf.CollectionAddr,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(builder.rpcConf.MaxMsgSize))),
grpc.WithTransportCredentials(insecure.NewCredentials()),
backend.WithClientUnaryInterceptor(builder.rpcConf.CollectionClientTimeout))
backend.WithClientUnaryInterceptor(builder.rpcConf.BackendConfig.CollectionClientTimeout))
if err != nil {
return err
}
Expand Down Expand Up @@ -972,7 +974,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("rest metrics", func(node *cmd.NodeConfig) error {
m, err := metrics.NewRestCollector(rest.URLToRoute, node.MetricsRegisterer)
m, err := metrics.NewRestCollector(routes.URLToRoute, node.MetricsRegisterer)
if err != nil {
return err
}
Expand Down Expand Up @@ -1034,10 +1036,31 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
engineBuilder, err := rpc.NewBuilder(
node.Logger,
config := builder.rpcConf
backendConfig := config.BackendConfig
accessMetrics := builder.AccessMetrics

backendCache, cacheSize, err := backend.NewCache(node.Logger,
accessMetrics,
backendConfig.ConnectionPoolSize)
if err != nil {
return nil, fmt.Errorf("could not initialize backend cache: %w", err)
}

connFactory := &backend.ConnectionFactoryImpl{
CollectionGRPCPort: builder.collectionGRPCPort,
ExecutionGRPCPort: builder.executionGRPCPort,
CollectionNodeGRPCTimeout: backendConfig.CollectionClientTimeout,
ExecutionNodeGRPCTimeout: backendConfig.ExecutionClientTimeout,
ConnectionsCache: backendCache,
CacheSize: cacheSize,
MaxMsgSize: config.MaxMsgSize,
AccessMetrics: accessMetrics,
Log: node.Logger,
}

backend := backend.New(
node.State,
builder.rpcConf,
builder.CollectionRPC,
builder.HistoricalAccessRPCs,
node.Storage.Blocks,
Expand All @@ -1048,11 +1071,25 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Storage.Results,
node.RootChainID,
builder.AccessMetrics,
builder.collectionGRPCPort,
builder.executionGRPCPort,
connFactory,
builder.retryEnabled,
backendConfig.MaxHeightRange,
backendConfig.PreferredExecutionNodeIDs,
backendConfig.FixedExecutionNodeIDs,
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList)

engineBuilder, err := rpc.NewBuilder(
node.Logger,
node.State,
config,
node.RootChainID,
builder.AccessMetrics,
builder.rpcMetricsEnabled,
builder.Me,
backend,
backend,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
)
Expand Down
Loading

0 comments on commit bd3625e

Please sign in to comment.