Skip to content

Commit

Permalink
Merge #4411
Browse files Browse the repository at this point in the history
4411:  [Access] Refactor Access RPC engines to support a single gRPC port #4217 r=durkmurder a=UlyanaAndrukhiv

#4217

### Context

This pull request 
* creates a new grpcserver module which includes a GrpcServer and a GrpcServerBuilder for it. GrpcServer defines a grpc server that starts once and uses in different Engines
* refactored the Access RPC engine and state stream engine setup using the same grps server. Also small refactored Observer node's engine setup
* refactored tests according to new changes

Co-authored-by: UlyanaAndrukhiv <u.andrukhiv@gmail.com>
Co-authored-by: Uliana Andrukhiv <u.andrukhiv@gmail.com>
  • Loading branch information
bors[bot] and UlyanaAndrukhiv authored Jul 21, 2023
2 parents ca27c87 + d6eb702 commit b230700
Show file tree
Hide file tree
Showing 11 changed files with 757 additions and 215 deletions.
59 changes: 55 additions & 4 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/mempool/herocache"
"github.com/onflow/flow-go/module/mempool/stdmap"
Expand Down Expand Up @@ -241,6 +242,11 @@ type FlowAccessNodeBuilder struct {
FollowerEng *followereng.ComplianceEngine
SyncEng *synceng.Engine
StateStreamEng *state_stream.Engine

// grpc servers
secureGrpcServer *grpcserver.GrpcServer
unsecureGrpcServer *grpcserver.GrpcServer
stateStreamGrpcServer *grpcserver.GrpcServer
}

func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder {
Expand Down Expand Up @@ -613,8 +619,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN
node.RootChainID,
builder.executionDataConfig.InitialBlockHeight,
highestAvailableHeight,
builder.apiRatelimits,
builder.apiBurstlimits,
builder.stateStreamGrpcServer,
)
if err != nil {
return nil, fmt.Errorf("could not create state stream engine: %w", err)
Expand Down Expand Up @@ -996,6 +1001,38 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.rpcConf.TransportCredentials = credentials.NewTLS(tlsConfig)
return nil
}).
Module("creating grpc servers", func(node *cmd.NodeConfig) error {
builder.secureGrpcServer = grpcserver.NewGrpcServerBuilder(
node.Logger,
builder.rpcConf.SecureGRPCListenAddr,
builder.rpcConf.MaxMsgSize,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits,
grpcserver.WithTransportCredentials(builder.rpcConf.TransportCredentials)).Build()

builder.stateStreamGrpcServer = grpcserver.NewGrpcServerBuilder(
node.Logger,
builder.stateStreamConf.ListenAddr,
builder.stateStreamConf.MaxExecutionDataMsgSize,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits,
grpcserver.WithStreamInterceptor()).Build()

if builder.rpcConf.UnsecureGRPCListenAddr != builder.stateStreamConf.ListenAddr {
builder.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger,
builder.rpcConf.UnsecureGRPCListenAddr,
builder.rpcConf.MaxMsgSize,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits).Build()
} else {
builder.unsecureGrpcServer = builder.stateStreamGrpcServer
}

return nil
}).
Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
engineBuilder, err := rpc.NewBuilder(
node.Logger,
Expand All @@ -1015,9 +1052,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.executionGRPCPort,
builder.retryEnabled,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits,
builder.Me,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1107,6 +1144,20 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) {
builder.BuildExecutionDataRequester()
}

builder.Component("secure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.secureGrpcServer, nil
})

builder.Component("state stream unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.stateStreamGrpcServer, nil
})

if builder.rpcConf.UnsecureGRPCListenAddr != builder.stateStreamConf.ListenAddr {
builder.Component("unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.unsecureGrpcServer, nil
})
}

builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
ping, err := pingeng.New(
node.Logger,
Expand Down
37 changes: 35 additions & 2 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/chainsync"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/id"
"github.com/onflow/flow-go/module/local"
"github.com/onflow/flow-go/module/metrics"
Expand Down Expand Up @@ -162,6 +163,10 @@ type ObserverServiceBuilder struct {

// Public network
peerID peer.ID

// grpc servers
secureGrpcServer *grpcserver.GrpcServer
unsecureGrpcServer *grpcserver.GrpcServer
}

// deriveBootstrapPeerIdentities derives the Flow Identity of the bootstrap peers from the parameters.
Expand Down Expand Up @@ -844,6 +849,24 @@ func (builder *ObserverServiceBuilder) enqueueConnectWithStakedAN() {
}

func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.Module("creating grpc servers", func(node *cmd.NodeConfig) error {
builder.secureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger,
builder.rpcConf.SecureGRPCListenAddr,
builder.rpcConf.MaxMsgSize,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits,
grpcserver.WithTransportCredentials(builder.rpcConf.TransportCredentials)).Build()

builder.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger,
builder.rpcConf.UnsecureGRPCListenAddr,
builder.rpcConf.MaxMsgSize,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits).Build()

return nil
})
builder.Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
engineBuilder, err := rpc.NewBuilder(
node.Logger,
Expand All @@ -863,9 +886,9 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
0,
false,
builder.rpcMetricsEnabled,
builder.apiRatelimits,
builder.apiBurstlimits,
builder.Me,
builder.secureGrpcServer,
builder.unsecureGrpcServer,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -900,6 +923,16 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() {
builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.RpcEng.OnFinalizedBlock)
return builder.RpcEng, nil
})

// build secure grpc server
builder.Component("secure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.secureGrpcServer, nil
})

// build unsecure grpc server
builder.Component("unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
return builder.unsecureGrpcServer, nil
})
}

// initMiddleware creates the network.Middleware implementation with the libp2p factory function, metrics, peer update
Expand Down
Loading

0 comments on commit b230700

Please sign in to comment.