Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Observer] Enable REST API on observers #4499

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
6938f40
Created new RestServerApi, refactored rest router and server, added B…
UlyanaAndrukhiv Jun 8, 2023
fcbf397
Separated common forwarder logic to forwarder package, created and im…
UlyanaAndrukhiv Jun 8, 2023
a685db8
Updated access api proxy test
UlyanaAndrukhiv Jun 8, 2023
557c6dd
Added RestRouter, refactored engine and engine builder
UlyanaAndrukhiv Jun 16, 2023
0a8aa0d
Updated tests
UlyanaAndrukhiv Jun 16, 2023
ee5b7c5
Refactored tests ande added new tests for rest servise. Linted.
UlyanaAndrukhiv Jun 19, 2023
23b4cd0
Added observer rest integration test, linted
UlyanaAndrukhiv Jun 21, 2023
ea525fd
Fixed MaxMsgSize argument
UlyanaAndrukhiv Jun 21, 2023
d8500ca
Merged with master
UlyanaAndrukhiv Jun 22, 2023
8c8cd48
Created Config structure for backend
UlyanaAndrukhiv Jun 22, 2023
2368ca2
Removed old unnecessary comment, added small upgrade for ObserverColl…
UlyanaAndrukhiv Jun 23, 2023
8854978
Refactored rest api
UlyanaAndrukhiv Jun 23, 2023
2db0deb
Added more comments
UlyanaAndrukhiv Jun 23, 2023
5425957
Merge branch 'master' into UlyanaAndrukhiv/3138-rest-api-on-observers
UlyanaAndrukhiv Jun 23, 2023
264f86f
Updated rest README
UlyanaAndrukhiv Jun 23, 2023
12f69d2
Moved forwarder package to engine/common/grpc/forwarder, added constr…
UlyanaAndrukhiv Jun 26, 2023
0e76e55
Refactored request forwarding according to suggestions from code review
UlyanaAndrukhiv Jun 29, 2023
280fa42
Merge branch 'master' into UlyanaAndrukhiv/3138-rest-api-on-observers
UlyanaAndrukhiv Jun 29, 2023
9deccb8
Added constructor to rest proxy handler, updated test
UlyanaAndrukhiv Jun 29, 2023
bbe70d0
Changed flow/protobuf/go/flow module to latest, removed lines with re…
UlyanaAndrukhiv Jun 29, 2023
276a101
Refactored according to commits
UlyanaAndrukhiv Jun 30, 2023
74c47d5
Removed RestBackendApi and refactored rest using standard Access API'…
UlyanaAndrukhiv Jun 30, 2023
f9a15e8
Updated logs, added CLI flag and added access metrics initialization
UlyanaAndrukhiv Jun 30, 2023
aee37a9
Updated README file, updated comment
UlyanaAndrukhiv Jun 30, 2023
d1ebcb7
Merged with master
UlyanaAndrukhiv Jun 30, 2023
e46ee77
Merge branch 'master' into UlyanaAndrukhiv/3138-rest-api-on-observers
UlyanaAndrukhiv Jun 30, 2023
3d5dfc4
Merge branch 'master' into UlyanaAndrukhiv/3138-rest-api-on-observers
UlyanaAndrukhiv Jul 4, 2023
92674a7
Updated README file, updated logging accoring to comments
UlyanaAndrukhiv Jul 10, 2023
be61031
Merged with master
UlyanaAndrukhiv Jul 10, 2023
9256de4
Added tests for convert functions, moved 2 to right file
UlyanaAndrukhiv Jul 10, 2023
c467235
Updated unittest according to comment
UlyanaAndrukhiv Jul 11, 2023
145de05
Updated rest integration test
UlyanaAndrukhiv Jul 11, 2023
000581e
Fixed typo, updated last commit
UlyanaAndrukhiv Jul 11, 2023
a2aa5d7
Merge branch 'master' into UlyanaAndrukhiv/3138-rest-api-on-observers
UlyanaAndrukhiv Jul 12, 2023
8ca997d
Merge branch 'master' into UlyanaAndrukhiv/3138-rest-api-on-observers
UlyanaAndrukhiv Jul 14, 2023
8455b98
Merge branch 'master' into UlyanaAndrukhiv/3138-rest-api-on-observers
UlyanaAndrukhiv Jul 17, 2023
c7d1f07
Merged with master
UlyanaAndrukhiv Jul 19, 2023
ea7748f
Added part of updates according to comments
UlyanaAndrukhiv Jul 20, 2023
77ff167
Merged with master
UlyanaAndrukhiv Jul 20, 2023
4d37538
Merged with master
UlyanaAndrukhiv Jul 20, 2023
c1347be
Linted
UlyanaAndrukhiv Jul 20, 2023
36869d1
Merge branch 'master' of github.com:UlyanaAndrukhiv/flow-go into Ulya…
UlyanaAndrukhiv Jul 20, 2023
7faa9e7
Fixed last commit
UlyanaAndrukhiv Jul 20, 2023
26b435e
Updated according to comments, added rest metrics to observer node
UlyanaAndrukhiv Jul 21, 2023
d47cc09
Merge branch 'master' into UlyanaAndrukhiv/3138-rest-api-on-observers
UlyanaAndrukhiv Jul 21, 2023
c7c05b9
Merged with master
UlyanaAndrukhiv Jul 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 26 additions & 33 deletions access/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package access
import (
"context"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/signature"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"

"github.com/onflow/flow/protobuf/go/flow/access"
"github.com/onflow/flow/protobuf/go/flow/entities"
)

type Handler struct {
Expand Down Expand Up @@ -516,7 +516,7 @@ func (h *Handler) GetEventsForHeightRange(
return nil, err
}

resultEvents, err := blockEventsToMessages(results)
resultEvents, err := convert.BlockEventsToMessages(results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -548,7 +548,7 @@ func (h *Handler) GetEventsForBlockIDs(
return nil, err
}

resultEvents, err := blockEventsToMessages(results)
resultEvents, err := convert.BlockEventsToMessages(results)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -590,6 +590,27 @@ func (h *Handler) GetExecutionResultForBlockID(ctx context.Context, req *access.
return executionResultToMessages(result, metadata)
}

// GetExecutionResultByID returns the execution result for the given ID.
func (h *Handler) GetExecutionResultByID(ctx context.Context, req *access.GetExecutionResultByIDRequest) (*access.ExecutionResultByIDResponse, error) {
metadata := h.buildMetadataResponse()

blockID := convert.MessageToIdentifier(req.GetId())

result, err := h.api.GetExecutionResultByID(ctx, blockID)
if err != nil {
return nil, err
}

execResult, err := convert.ExecutionResultToMessage(result)
if err != nil {
return nil, err
}
return &access.ExecutionResultByIDResponse{
ExecutionResult: execResult,
Metadata: metadata,
}, nil
}

func (h *Handler) blockResponse(block *flow.Block, fullResponse bool, status flow.BlockStatus) (*access.BlockResponse, error) {
metadata := h.buildMetadataResponse()

Expand Down Expand Up @@ -659,34 +680,6 @@ func executionResultToMessages(er *flow.ExecutionResult, metadata *entities.Meta
}, nil
}

func blockEventsToMessages(blocks []flow.BlockEvents) ([]*access.EventsResponse_Result, error) {
results := make([]*access.EventsResponse_Result, len(blocks))

for i, block := range blocks {
event, err := blockEventsToMessage(block)
if err != nil {
return nil, err
}
results[i] = event
}

return results, nil
}

func blockEventsToMessage(block flow.BlockEvents) (*access.EventsResponse_Result, error) {
eventMessages := make([]*entities.Event, len(block.Events))
for i, event := range block.Events {
eventMessages[i] = convert.EventToMessage(event)
}
timestamp := timestamppb.New(block.BlockTimestamp)
return &access.EventsResponse_Result{
BlockId: block.BlockID[:],
BlockHeight: block.BlockHeight,
BlockTimestamp: timestamp,
Events: eventMessages,
}, nil
}

// WithBlockSignerDecoder configures the Handler to decode signer indices
// via the provided hotstuff.BlockSignerDecoder
func WithBlockSignerDecoder(signerIndicesDecoder hotstuff.BlockSignerDecoder) func(*Handler) {
Expand Down
95 changes: 66 additions & 29 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/onflow/flow-go/crypto"
"github.com/onflow/flow-go/engine/access/ingestion"
pingeng "github.com/onflow/flow-go/engine/access/ping"
"github.com/onflow/flow-go/engine/access/rest"
"github.com/onflow/flow-go/engine/access/rest/routes"
"github.com/onflow/flow-go/engine/access/rpc"
"github.com/onflow/flow-go/engine/access/rpc/backend"
"github.com/onflow/flow-go/engine/access/state_stream"
Expand Down Expand Up @@ -147,20 +147,22 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
collectionGRPCPort: 9000,
executionGRPCPort: 9000,
rpcConf: rpc.Config{
UnsecureGRPCListenAddr: "0.0.0.0:9000",
SecureGRPCListenAddr: "0.0.0.0:9001",
HTTPListenAddr: "0.0.0.0:8000",
RESTListenAddr: "",
CollectionAddr: "",
HistoricalAccessAddrs: "",
CollectionClientTimeout: 3 * time.Second,
ExecutionClientTimeout: 3 * time.Second,
ConnectionPoolSize: backend.DefaultConnectionPoolSize,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: nil,
FixedExecutionNodeIDs: nil,
ArchiveAddressList: nil,
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
UnsecureGRPCListenAddr: "0.0.0.0:9000",
SecureGRPCListenAddr: "0.0.0.0:9001",
HTTPListenAddr: "0.0.0.0:8000",
RESTListenAddr: "",
CollectionAddr: "",
HistoricalAccessAddrs: "",
BackendConfig: backend.Config{
CollectionClientTimeout: 3 * time.Second,
ExecutionClientTimeout: 3 * time.Second,
ConnectionPoolSize: backend.DefaultConnectionPoolSize,
MaxHeightRange: backend.DefaultMaxHeightRange,
PreferredExecutionNodeIDs: nil,
FixedExecutionNodeIDs: nil,
ArchiveAddressList: nil,
},
MaxMsgSize: grpcutils.DefaultMaxMsgSize,
},
stateStreamConf: state_stream.Config{
MaxExecutionDataMsgSize: grpcutils.DefaultMaxMsgSize,
Expand Down Expand Up @@ -667,15 +669,15 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.StringVar(&builder.rpcConf.RESTListenAddr, "rest-addr", defaultConfig.rpcConf.RESTListenAddr, "the address the REST server listens on (if empty the REST server will not be started)")
flags.StringVarP(&builder.rpcConf.CollectionAddr, "static-collection-ingress-addr", "", defaultConfig.rpcConf.CollectionAddr, "the address (of the collection node) to send transactions to")
flags.StringVarP(&builder.ExecutionNodeAddress, "script-addr", "s", defaultConfig.ExecutionNodeAddress, "the address (of the execution node) forward the script to")
flags.StringSliceVar(&builder.rpcConf.ArchiveAddressList, "archive-address-list", defaultConfig.rpcConf.ArchiveAddressList, "the list of address of the archive node to forward the script queries to")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.ArchiveAddressList, "archive-address-list", defaultConfig.rpcConf.BackendConfig.ArchiveAddressList, "the list of address of the archive node to forward the script queries to")
flags.StringVarP(&builder.rpcConf.HistoricalAccessAddrs, "historical-access-addr", "", defaultConfig.rpcConf.HistoricalAccessAddrs, "comma separated rpc addresses for historical access nodes")
flags.DurationVar(&builder.rpcConf.CollectionClientTimeout, "collection-client-timeout", defaultConfig.rpcConf.CollectionClientTimeout, "grpc client timeout for a collection node")
flags.DurationVar(&builder.rpcConf.ExecutionClientTimeout, "execution-client-timeout", defaultConfig.rpcConf.ExecutionClientTimeout, "grpc client timeout for an execution node")
flags.UintVar(&builder.rpcConf.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.DurationVar(&builder.rpcConf.BackendConfig.CollectionClientTimeout, "collection-client-timeout", defaultConfig.rpcConf.BackendConfig.CollectionClientTimeout, "grpc client timeout for a collection node")
flags.DurationVar(&builder.rpcConf.BackendConfig.ExecutionClientTimeout, "execution-client-timeout", defaultConfig.rpcConf.BackendConfig.ExecutionClientTimeout, "grpc client timeout for an execution node")
flags.UintVar(&builder.rpcConf.BackendConfig.ConnectionPoolSize, "connection-pool-size", defaultConfig.rpcConf.BackendConfig.ConnectionPoolSize, "maximum number of connections allowed in the connection pool, size of 0 disables the connection pooling, and anything less than the default size will be overridden to use the default size")
flags.UintVar(&builder.rpcConf.MaxMsgSize, "rpc-max-message-size", grpcutils.DefaultMaxMsgSize, "the maximum message size in bytes for messages sent or received over grpc")
flags.UintVar(&builder.rpcConf.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.MaxHeightRange, "maximum size for height range requests")
flags.StringSliceVar(&builder.rpcConf.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.StringSliceVar(&builder.rpcConf.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.UintVar(&builder.rpcConf.BackendConfig.MaxHeightRange, "rpc-max-height-range", defaultConfig.rpcConf.BackendConfig.MaxHeightRange, "maximum size for height range requests")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.PreferredExecutionNodeIDs, "preferred-execution-node-ids", defaultConfig.rpcConf.BackendConfig.PreferredExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.StringSliceVar(&builder.rpcConf.BackendConfig.FixedExecutionNodeIDs, "fixed-execution-node-ids", defaultConfig.rpcConf.BackendConfig.FixedExecutionNodeIDs, "comma separated list of execution nodes ids to choose from when making an upstream call if no matching preferred execution id is found e.g. b4a4dbdcd443d...,fb386a6a... etc.")
flags.BoolVar(&builder.logTxTimeToFinalized, "log-tx-time-to-finalized", defaultConfig.logTxTimeToFinalized, "log transaction time to finalized")
flags.BoolVar(&builder.logTxTimeToExecuted, "log-tx-time-to-executed", defaultConfig.logTxTimeToExecuted, "log transaction time to executed")
flags.BoolVar(&builder.logTxTimeToFinalizedExecuted, "log-tx-time-to-finalized-executed", defaultConfig.logTxTimeToFinalizedExecuted, "log transaction time to finalized and executed")
Expand Down Expand Up @@ -915,7 +917,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.rpcConf.CollectionAddr,
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(int(builder.rpcConf.MaxMsgSize))),
grpc.WithTransportCredentials(insecure.NewCredentials()),
backend.WithClientUnaryInterceptor(builder.rpcConf.CollectionClientTimeout))
backend.WithClientUnaryInterceptor(builder.rpcConf.BackendConfig.CollectionClientTimeout))
if err != nil {
return err
}
Expand Down Expand Up @@ -972,7 +974,7 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Module("rest metrics", func(node *cmd.NodeConfig) error {
m, err := metrics.NewRestCollector(rest.URLToRoute, node.MetricsRegisterer)
m, err := metrics.NewRestCollector(routes.URLToRoute, node.MetricsRegisterer)
if err != nil {
return err
}
Expand Down Expand Up @@ -1034,10 +1036,31 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
engineBuilder, err := rpc.NewBuilder(
node.Logger,
config := builder.rpcConf
backendConfig := config.BackendConfig
accessMetrics := builder.AccessMetrics

backendCache, cacheSize, err := backend.NewCache(node.Logger,
accessMetrics,
backendConfig.ConnectionPoolSize)
if err != nil {
return nil, fmt.Errorf("could not initialize backend cache: %w", err)
}

connFactory := &backend.ConnectionFactoryImpl{
CollectionGRPCPort: builder.collectionGRPCPort,
ExecutionGRPCPort: builder.executionGRPCPort,
CollectionNodeGRPCTimeout: backendConfig.CollectionClientTimeout,
ExecutionNodeGRPCTimeout: backendConfig.ExecutionClientTimeout,
ConnectionsCache: backendCache,
CacheSize: cacheSize,
MaxMsgSize: config.MaxMsgSize,
AccessMetrics: accessMetrics,
Log: node.Logger,
}

backend := backend.New(
node.State,
builder.rpcConf,
builder.CollectionRPC,
builder.HistoricalAccessRPCs,
node.Storage.Blocks,
Expand All @@ -1048,11 +1071,25 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
node.Storage.Results,
node.RootChainID,
builder.AccessMetrics,
builder.collectionGRPCPort,
builder.executionGRPCPort,
connFactory,
builder.retryEnabled,
backendConfig.MaxHeightRange,
backendConfig.PreferredExecutionNodeIDs,
backendConfig.FixedExecutionNodeIDs,
node.Logger,
backend.DefaultSnapshotHistoryLimit,
backendConfig.ArchiveAddressList)

engineBuilder, err := rpc.NewBuilder(
node.Logger,
node.State,
config,
node.RootChainID,
builder.AccessMetrics,
builder.rpcMetricsEnabled,
builder.Me,
backend,
backend,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
)
Expand Down
Loading