diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index c1b59dd5681..a39cf32f350 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -52,6 +52,7 @@ import ( "github.com/onflow/flow-go/module/executiondatasync/execution_data" execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" + "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/id" "github.com/onflow/flow-go/module/mempool/herocache" "github.com/onflow/flow-go/module/mempool/stdmap" @@ -241,6 +242,11 @@ type FlowAccessNodeBuilder struct { FollowerEng *followereng.ComplianceEngine SyncEng *synceng.Engine StateStreamEng *state_stream.Engine + + // grpc servers + secureGrpcServer *grpcserver.GrpcServer + unsecureGrpcServer *grpcserver.GrpcServer + stateStreamGrpcServer *grpcserver.GrpcServer } func (builder *FlowAccessNodeBuilder) buildFollowerState() *FlowAccessNodeBuilder { @@ -613,8 +619,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionDataRequester() *FlowAccessN node.RootChainID, builder.executionDataConfig.InitialBlockHeight, highestAvailableHeight, - builder.apiRatelimits, - builder.apiBurstlimits, + builder.stateStreamGrpcServer, ) if err != nil { return nil, fmt.Errorf("could not create state stream engine: %w", err) @@ -996,6 +1001,38 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.rpcConf.TransportCredentials = credentials.NewTLS(tlsConfig) return nil }). + Module("creating grpc servers", func(node *cmd.NodeConfig) error { + builder.secureGrpcServer = grpcserver.NewGrpcServerBuilder( + node.Logger, + builder.rpcConf.SecureGRPCListenAddr, + builder.rpcConf.MaxMsgSize, + builder.rpcMetricsEnabled, + builder.apiRatelimits, + builder.apiBurstlimits, + grpcserver.WithTransportCredentials(builder.rpcConf.TransportCredentials)).Build() + + builder.stateStreamGrpcServer = grpcserver.NewGrpcServerBuilder( + node.Logger, + builder.stateStreamConf.ListenAddr, + builder.stateStreamConf.MaxExecutionDataMsgSize, + builder.rpcMetricsEnabled, + builder.apiRatelimits, + builder.apiBurstlimits, + grpcserver.WithStreamInterceptor()).Build() + + if builder.rpcConf.UnsecureGRPCListenAddr != builder.stateStreamConf.ListenAddr { + builder.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger, + builder.rpcConf.UnsecureGRPCListenAddr, + builder.rpcConf.MaxMsgSize, + builder.rpcMetricsEnabled, + builder.apiRatelimits, + builder.apiBurstlimits).Build() + } else { + builder.unsecureGrpcServer = builder.stateStreamGrpcServer + } + + return nil + }). Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { engineBuilder, err := rpc.NewBuilder( node.Logger, @@ -1015,9 +1052,9 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.executionGRPCPort, builder.retryEnabled, builder.rpcMetricsEnabled, - builder.apiRatelimits, - builder.apiBurstlimits, builder.Me, + builder.secureGrpcServer, + builder.unsecureGrpcServer, ) if err != nil { return nil, err @@ -1107,6 +1144,20 @@ func (builder *FlowAccessNodeBuilder) Build() (cmd.Node, error) { builder.BuildExecutionDataRequester() } + builder.Component("secure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + return builder.secureGrpcServer, nil + }) + + builder.Component("state stream unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + return builder.stateStreamGrpcServer, nil + }) + + if builder.rpcConf.UnsecureGRPCListenAddr != builder.stateStreamConf.ListenAddr { + builder.Component("unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + return builder.unsecureGrpcServer, nil + }) + } + builder.Component("ping engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { ping, err := pingeng.New( node.Logger, diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 78ddc464fb7..36343507a8f 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -39,6 +39,7 @@ import ( "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/chainsync" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" + "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/id" "github.com/onflow/flow-go/module/local" "github.com/onflow/flow-go/module/metrics" @@ -162,6 +163,10 @@ type ObserverServiceBuilder struct { // Public network peerID peer.ID + + // grpc servers + secureGrpcServer *grpcserver.GrpcServer + unsecureGrpcServer *grpcserver.GrpcServer } // deriveBootstrapPeerIdentities derives the Flow Identity of the bootstrap peers from the parameters. @@ -844,6 +849,24 @@ func (builder *ObserverServiceBuilder) enqueueConnectWithStakedAN() { } func (builder *ObserverServiceBuilder) enqueueRPCServer() { + builder.Module("creating grpc servers", func(node *cmd.NodeConfig) error { + builder.secureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger, + builder.rpcConf.SecureGRPCListenAddr, + builder.rpcConf.MaxMsgSize, + builder.rpcMetricsEnabled, + builder.apiRatelimits, + builder.apiBurstlimits, + grpcserver.WithTransportCredentials(builder.rpcConf.TransportCredentials)).Build() + + builder.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(node.Logger, + builder.rpcConf.UnsecureGRPCListenAddr, + builder.rpcConf.MaxMsgSize, + builder.rpcMetricsEnabled, + builder.apiRatelimits, + builder.apiBurstlimits).Build() + + return nil + }) builder.Component("RPC engine", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { engineBuilder, err := rpc.NewBuilder( node.Logger, @@ -863,9 +886,9 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { 0, false, builder.rpcMetricsEnabled, - builder.apiRatelimits, - builder.apiBurstlimits, builder.Me, + builder.secureGrpcServer, + builder.unsecureGrpcServer, ) if err != nil { return nil, err @@ -900,6 +923,16 @@ func (builder *ObserverServiceBuilder) enqueueRPCServer() { builder.FollowerDistributor.AddOnBlockFinalizedConsumer(builder.RpcEng.OnFinalizedBlock) return builder.RpcEng, nil }) + + // build secure grpc server + builder.Component("secure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + return builder.secureGrpcServer, nil + }) + + // build unsecure grpc server + builder.Component("unsecure grpc server", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + return builder.unsecureGrpcServer, nil + }) } // initMiddleware creates the network.Middleware implementation with the libp2p factory function, metrics, peer update diff --git a/engine/access/integration_unsecure_grpc_server_test.go b/engine/access/integration_unsecure_grpc_server_test.go new file mode 100644 index 00000000000..e5fa9dbe4c9 --- /dev/null +++ b/engine/access/integration_unsecure_grpc_server_test.go @@ -0,0 +1,300 @@ +package access + +import ( + "context" + "io" + "os" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + + "github.com/onflow/flow-go/engine" + accessmock "github.com/onflow/flow-go/engine/access/mock" + "github.com/onflow/flow-go/engine/access/rpc" + "github.com/onflow/flow-go/engine/access/state_stream" + "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/blobs" + "github.com/onflow/flow-go/module/executiondatasync/execution_data" + "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" + "github.com/onflow/flow-go/module/grpcserver" + "github.com/onflow/flow-go/module/irrecoverable" + "github.com/onflow/flow-go/module/mempool/herocache" + "github.com/onflow/flow-go/module/metrics" + module "github.com/onflow/flow-go/module/mock" + "github.com/onflow/flow-go/network" + protocol "github.com/onflow/flow-go/state/protocol/mock" + "github.com/onflow/flow-go/storage" + storagemock "github.com/onflow/flow-go/storage/mock" + "github.com/onflow/flow-go/utils/grpcutils" + "github.com/onflow/flow-go/utils/unittest" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" + executiondataproto "github.com/onflow/flow/protobuf/go/flow/executiondata" +) + +// SameGRPCPortTestSuite verifies both AccessAPI and ExecutionDataAPI client continue to work when configured +// on the same port +type SameGRPCPortTestSuite struct { + suite.Suite + state *protocol.State + snapshot *protocol.Snapshot + epochQuery *protocol.EpochQuery + log zerolog.Logger + net *network.Network + request *module.Requester + collClient *accessmock.AccessAPIClient + execClient *accessmock.ExecutionAPIClient + me *module.Local + chainID flow.ChainID + metrics *metrics.NoopCollector + rpcEng *rpc.Engine + stateStreamEng *state_stream.Engine + + // storage + blocks *storagemock.Blocks + headers *storagemock.Headers + collections *storagemock.Collections + transactions *storagemock.Transactions + receipts *storagemock.ExecutionReceipts + seals *storagemock.Seals + results *storagemock.ExecutionResults + + ctx irrecoverable.SignalerContext + cancel context.CancelFunc + + // grpc servers + secureGrpcServer *grpcserver.GrpcServer + unsecureGrpcServer *grpcserver.GrpcServer + + bs blobs.Blobstore + eds execution_data.ExecutionDataStore + broadcaster *engine.Broadcaster + execDataCache *cache.ExecutionDataCache + execDataHeroCache *herocache.BlockExecutionData + + blockMap map[uint64]*flow.Block +} + +func (suite *SameGRPCPortTestSuite) SetupTest() { + suite.log = zerolog.New(os.Stdout) + suite.net = new(network.Network) + suite.state = new(protocol.State) + suite.snapshot = new(protocol.Snapshot) + + suite.epochQuery = new(protocol.EpochQuery) + suite.state.On("Sealed").Return(suite.snapshot, nil).Maybe() + suite.state.On("Final").Return(suite.snapshot, nil).Maybe() + suite.snapshot.On("Epochs").Return(suite.epochQuery).Maybe() + suite.blocks = new(storagemock.Blocks) + suite.headers = new(storagemock.Headers) + suite.transactions = new(storagemock.Transactions) + suite.collections = new(storagemock.Collections) + suite.receipts = new(storagemock.ExecutionReceipts) + suite.results = new(storagemock.ExecutionResults) + suite.seals = new(storagemock.Seals) + + suite.collClient = new(accessmock.AccessAPIClient) + suite.execClient = new(accessmock.ExecutionAPIClient) + + suite.request = new(module.Requester) + suite.request.On("EntityByID", mock.Anything, mock.Anything) + + suite.me = new(module.Local) + suite.eds = execution_data.NewExecutionDataStore(suite.bs, execution_data.DefaultSerializer) + + suite.broadcaster = engine.NewBroadcaster() + + suite.execDataHeroCache = herocache.NewBlockExecutionData(state_stream.DefaultCacheSize, suite.log, metrics.NewNoopCollector()) + suite.execDataCache = cache.NewExecutionDataCache(suite.eds, suite.headers, suite.seals, suite.results, suite.execDataHeroCache) + + accessIdentity := unittest.IdentityFixture(unittest.WithRole(flow.RoleAccess)) + suite.me. + On("NodeID"). + Return(accessIdentity.NodeID) + + suite.chainID = flow.Testnet + suite.metrics = metrics.NewNoopCollector() + + config := rpc.Config{ + UnsecureGRPCListenAddr: unittest.DefaultAddress, + SecureGRPCListenAddr: unittest.DefaultAddress, + HTTPListenAddr: unittest.DefaultAddress, + } + + blockCount := 5 + suite.blockMap = make(map[uint64]*flow.Block, blockCount) + // generate blockCount consecutive blocks with associated seal, result and execution data + rootBlock := unittest.BlockFixture() + parent := rootBlock.Header + suite.blockMap[rootBlock.Header.Height] = &rootBlock + + for i := 0; i < blockCount; i++ { + block := unittest.BlockWithParentFixture(parent) + suite.blockMap[block.Header.Height] = block + } + + // generate a server certificate that will be served by the GRPC server + networkingKey := unittest.NetworkingPrivKeyFixture() + x509Certificate, err := grpcutils.X509Certificate(networkingKey) + assert.NoError(suite.T(), err) + tlsConfig := grpcutils.DefaultServerTLSConfig(x509Certificate) + // set the transport credentials for the server to use + config.TransportCredentials = credentials.NewTLS(tlsConfig) + + suite.secureGrpcServer = grpcserver.NewGrpcServerBuilder(suite.log, + config.SecureGRPCListenAddr, + grpcutils.DefaultMaxMsgSize, + false, + nil, + nil, + grpcserver.WithTransportCredentials(config.TransportCredentials)).Build() + + suite.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(suite.log, + config.UnsecureGRPCListenAddr, + grpcutils.DefaultMaxMsgSize, + false, + nil, + nil).Build() + + block := unittest.BlockHeaderFixture() + suite.snapshot.On("Head").Return(block, nil) + + // create rpc engine builder + rpcEngBuilder, err := rpc.NewBuilder( + suite.log, + suite.state, + config, + suite.collClient, + nil, + suite.blocks, + suite.headers, + suite.collections, + suite.transactions, + nil, + nil, + suite.chainID, + suite.metrics, + 0, + 0, + false, + false, + suite.me, + suite.secureGrpcServer, + suite.unsecureGrpcServer, + ) + assert.NoError(suite.T(), err) + suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build() + assert.NoError(suite.T(), err) + suite.ctx, suite.cancel = irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background()) + + suite.headers.On("BlockIDByHeight", mock.AnythingOfType("uint64")).Return( + func(height uint64) flow.Identifier { + if block, ok := suite.blockMap[height]; ok { + return block.Header.ID() + } + return flow.ZeroID + }, + func(height uint64) error { + if _, ok := suite.blockMap[height]; ok { + return nil + } + return storage.ErrNotFound + }, + ).Maybe() + + conf := state_stream.Config{ + ClientSendTimeout: state_stream.DefaultSendTimeout, + ClientSendBufferSize: state_stream.DefaultSendBufferSize, + } + + // create state stream engine + suite.stateStreamEng, err = state_stream.NewEng( + suite.log, + conf, + nil, + suite.execDataCache, + suite.state, + suite.headers, + suite.seals, + suite.results, + suite.chainID, + rootBlock.Header.Height, + rootBlock.Header.Height, + suite.unsecureGrpcServer, + ) + assert.NoError(suite.T(), err) + + suite.rpcEng.Start(suite.ctx) + suite.stateStreamEng.Start(suite.ctx) + + suite.secureGrpcServer.Start(suite.ctx) + suite.unsecureGrpcServer.Start(suite.ctx) + + // wait for the servers to startup + unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Ready(), 2*time.Second) + unittest.AssertClosesBefore(suite.T(), suite.unsecureGrpcServer.Ready(), 2*time.Second) + + // wait for the rpc engine to startup + unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Ready(), 2*time.Second) + // wait for the state stream engine to startup + unittest.AssertClosesBefore(suite.T(), suite.stateStreamEng.Ready(), 2*time.Second) +} + +// TestEnginesOnTheSameGrpcPort verifies if both AccessAPI and ExecutionDataAPI client successfully connect and continue +// to work when configured on the same port +func (suite *SameGRPCPortTestSuite) TestEnginesOnTheSameGrpcPort() { + ctx := context.Background() + + conn, err := grpc.Dial( + suite.unsecureGrpcServer.GRPCAddress().String(), + grpc.WithTransportCredentials(insecure.NewCredentials())) + assert.NoError(suite.T(), err) + closer := io.Closer(conn) + + suite.Run("happy path - grpc access api client can connect successfully", func() { + req := &accessproto.GetNetworkParametersRequest{} + + // expect 2 upstream calls + suite.execClient.On("GetNetworkParameters", mock.Anything, mock.Anything).Return(nil, nil).Twice() + suite.collClient.On("GetNetworkParameters", mock.Anything, mock.Anything).Return(nil, nil).Twice() + + client := suite.unsecureAccessAPIClient(conn) + + _, err := client.GetNetworkParameters(ctx, req) + assert.NoError(suite.T(), err, "failed to get network") + }) + + suite.Run("happy path - grpc execution data api client can connect successfully", func() { + req := &executiondataproto.SubscribeEventsRequest{} + + client := suite.unsecureExecutionDataAPIClient(conn) + + _, err := client.SubscribeEvents(ctx, req) + assert.NoError(suite.T(), err, "failed to subscribe events") + }) + defer closer.Close() +} + +func TestSameGRPCTestSuite(t *testing.T) { + suite.Run(t, new(SameGRPCPortTestSuite)) +} + +// unsecureAccessAPIClient creates an unsecure grpc AccessAPI client +func (suite *SameGRPCPortTestSuite) unsecureAccessAPIClient(conn *grpc.ClientConn) accessproto.AccessAPIClient { + client := accessproto.NewAccessAPIClient(conn) + return client +} + +// unsecureExecutionDataAPIClient creates an unsecure ExecutionDataAPI client +func (suite *SameGRPCPortTestSuite) unsecureExecutionDataAPIClient(conn *grpc.ClientConn) executiondataproto.ExecutionDataAPIClient { + client := executiondataproto.NewExecutionDataAPIClient(conn) + return client +} diff --git a/engine/access/rest_api_test.go b/engine/access/rest_api_test.go index 5ee8f6d9730..8cab886605c 100644 --- a/engine/access/rest_api_test.go +++ b/engine/access/rest_api_test.go @@ -10,6 +10,11 @@ import ( "testing" "time" + "google.golang.org/grpc/credentials" + + "github.com/onflow/flow-go/module/grpcserver" + "github.com/onflow/flow-go/utils/grpcutils" + "github.com/antihax/optional" restclient "github.com/onflow/flow/openapi/go-client-generated" "github.com/rs/zerolog" @@ -63,6 +68,10 @@ type RestAPITestSuite struct { ctx irrecoverable.SignalerContext cancel context.CancelFunc + + // grpc servers + secureGrpcServer *grpcserver.GrpcServer + unsecureGrpcServer *grpcserver.GrpcServer } func (suite *RestAPITestSuite) SetupTest() { @@ -118,6 +127,29 @@ func (suite *RestAPITestSuite) SetupTest() { RESTListenAddr: unittest.DefaultAddress, } + // generate a server certificate that will be served by the GRPC server + networkingKey := unittest.NetworkingPrivKeyFixture() + x509Certificate, err := grpcutils.X509Certificate(networkingKey) + assert.NoError(suite.T(), err) + tlsConfig := grpcutils.DefaultServerTLSConfig(x509Certificate) + // set the transport credentials for the server to use + config.TransportCredentials = credentials.NewTLS(tlsConfig) + + suite.secureGrpcServer = grpcserver.NewGrpcServerBuilder(suite.log, + config.SecureGRPCListenAddr, + grpcutils.DefaultMaxMsgSize, + false, + nil, + nil, + grpcserver.WithTransportCredentials(config.TransportCredentials)).Build() + + suite.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(suite.log, + config.UnsecureGRPCListenAddr, + grpcutils.DefaultMaxMsgSize, + false, + nil, + nil).Build() + rpcEngBuilder, err := rpc.NewBuilder( suite.log, suite.state, @@ -136,22 +168,33 @@ func (suite *RestAPITestSuite) SetupTest() { 0, false, false, - nil, - nil, suite.me, + suite.secureGrpcServer, + suite.unsecureGrpcServer, ) assert.NoError(suite.T(), err) suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build() assert.NoError(suite.T(), err) suite.ctx, suite.cancel = irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background()) + suite.rpcEng.Start(suite.ctx) - // wait for the server to startup + + suite.secureGrpcServer.Start(suite.ctx) + suite.unsecureGrpcServer.Start(suite.ctx) + + // wait for the servers to startup + unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Ready(), 2*time.Second) + unittest.AssertClosesBefore(suite.T(), suite.unsecureGrpcServer.Ready(), 2*time.Second) + + // wait for the engine to startup unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Ready(), 2*time.Second) } func (suite *RestAPITestSuite) TearDownTest() { suite.cancel() + unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Done(), 2*time.Second) + unittest.AssertClosesBefore(suite.T(), suite.unsecureGrpcServer.Done(), 2*time.Second) unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Done(), 2*time.Second) } diff --git a/engine/access/rpc/engine.go b/engine/access/rpc/engine.go index d4c812df997..bfbeb07aa53 100644 --- a/engine/access/rpc/engine.go +++ b/engine/access/rpc/engine.go @@ -9,21 +9,19 @@ import ( "sync" "time" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" lru "github.com/hashicorp/golang-lru" accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" - "google.golang.org/grpc" "google.golang.org/grpc/credentials" "github.com/onflow/flow-go/consensus/hotstuff/model" "github.com/onflow/flow-go/engine/access/rest" "github.com/onflow/flow-go/engine/access/rpc/backend" - "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/events" + "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" @@ -46,7 +44,7 @@ type Config struct { ConnectionPoolSize uint // size of the cache for storing collection and execution connections MaxHeightRange uint // max size of height range requests PreferredExecutionNodeIDs []string // preferred list of upstream execution node IDs - FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node node ID can be chosen from the PreferredExecutionNodeIDs + FixedExecutionNodeIDs []string // fixed list of execution node IDs to choose from if no node ID can be chosen from the PreferredExecutionNodeIDs ArchiveAddressList []string // the archive node address list to send script executions. when configured, script executions will be all sent to the archive node } @@ -62,18 +60,16 @@ type Engine struct { log zerolog.Logger restCollector module.RestMetrics - backend *backend.Backend // the gRPC service implementation - unsecureGrpcServer *grpc.Server // the unsecure gRPC server - secureGrpcServer *grpc.Server // the secure gRPC server + backend *backend.Backend // the gRPC service implementation + unsecureGrpcServer *grpcserver.GrpcServer // the unsecure gRPC server + secureGrpcServer *grpcserver.GrpcServer // the secure gRPC server httpServer *http.Server restServer *http.Server config Config chain flow.Chain - addrLock sync.RWMutex - unsecureGrpcAddress net.Addr - secureGrpcAddress net.Addr - restAPIAddress net.Addr + addrLock sync.RWMutex + restAPIAddress net.Addr } // NewBuilder returns a new RPC engine builder. @@ -94,48 +90,15 @@ func NewBuilder(log zerolog.Logger, executionGRPCPort uint, retryEnabled bool, rpcMetricsEnabled bool, - apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the Access API e.g. Ping->100, GetTransaction->300 - apiBurstLimits map[string]int, // the api burst limit (max calls at the same time) for each of the Access API e.g. Ping->50, GetTransaction->10 me module.Local, + secureGrpcServer *grpcserver.GrpcServer, + unsecureGrpcServer *grpcserver.GrpcServer, ) (*RPCEngineBuilder, error) { log = log.With().Str("engine", "rpc").Logger() - // create a GRPC server to serve GRPC clients - grpcOpts := []grpc.ServerOption{ - grpc.MaxRecvMsgSize(int(config.MaxMsgSize)), - grpc.MaxSendMsgSize(int(config.MaxMsgSize)), - } - - var interceptors []grpc.UnaryServerInterceptor // ordered list of interceptors - // if rpc metrics is enabled, first create the grpc metrics interceptor - if rpcMetricsEnabled { - interceptors = append(interceptors, grpc_prometheus.UnaryServerInterceptor) - } - - if len(apiRatelimits) > 0 { - // create a rate limit interceptor - rateLimitInterceptor := rpc.NewRateLimiterInterceptor(log, apiRatelimits, apiBurstLimits).UnaryServerInterceptor - // append the rate limit interceptor to the list of interceptors - interceptors = append(interceptors, rateLimitInterceptor) - } - - // add the logging interceptor, ensure it is innermost wrapper - interceptors = append(interceptors, rpc.LoggingInterceptor(log)...) - - // create a chained unary interceptor - chainedInterceptors := grpc.ChainUnaryInterceptor(interceptors...) - grpcOpts = append(grpcOpts, chainedInterceptors) - - // create an unsecured grpc server - unsecureGrpcServer := grpc.NewServer(grpcOpts...) - - // create a secure server by using the secure grpc credentials that are passed in as part of config - grpcOpts = append(grpcOpts, grpc.Creds(config.TransportCredentials)) - secureGrpcServer := grpc.NewServer(grpcOpts...) - // wrap the unsecured server with an HTTP proxy server to serve HTTP clients - httpServer := newHTTPProxyServer(unsecureGrpcServer) + httpServer := newHTTPProxyServer(unsecureGrpcServer.Server) var cache *lru.Cache cacheSize := config.ConnectionPoolSize @@ -215,8 +178,14 @@ func NewBuilder(log zerolog.Logger, eng.backendNotifierActor = backendNotifierActor eng.Component = component.NewComponentManagerBuilder(). - AddWorker(eng.serveUnsecureGRPCWorker). - AddWorker(eng.serveSecureGRPCWorker). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + <-secureGrpcServer.Done() + }). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + <-unsecureGrpcServer.Done() + }). AddWorker(eng.serveGRPCWebProxyWorker). AddWorker(eng.serveREST). AddWorker(finalizedCacheWorker). @@ -245,8 +214,6 @@ func (e *Engine) shutdown() { // use unbounded context, rely on shutdown logic to have timeout ctx := context.Background() - e.unsecureGrpcServer.GracefulStop() - e.secureGrpcServer.GracefulStop() err := e.httpServer.Shutdown(ctx) if err != nil { e.log.Error().Err(err).Msg("error stopping http server") @@ -273,22 +240,6 @@ func (e *Engine) notifyBackendOnBlockFinalized(_ *model.Block) error { return nil } -// UnsecureGRPCAddress returns the listen address of the unsecure GRPC server. -// Guaranteed to be non-nil after Engine.Ready is closed. -func (e *Engine) UnsecureGRPCAddress() net.Addr { - e.addrLock.RLock() - defer e.addrLock.RUnlock() - return e.unsecureGrpcAddress -} - -// SecureGRPCAddress returns the listen address of the secure GRPC server. -// Guaranteed to be non-nil after Engine.Ready is closed. -func (e *Engine) SecureGRPCAddress() net.Addr { - e.addrLock.RLock() - defer e.addrLock.RUnlock() - return e.secureGrpcAddress -} - // RestApiAddress returns the listen address of the REST API server. // Guaranteed to be non-nil after Engine.Ready is closed. func (e *Engine) RestApiAddress() net.Addr { @@ -297,59 +248,6 @@ func (e *Engine) RestApiAddress() net.Addr { return e.restAPIAddress } -// serveUnsecureGRPCWorker is a worker routine which starts the unsecure gRPC server. -// The ready callback is called after the server address is bound and set. -func (e *Engine) serveUnsecureGRPCWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - e.log.Info().Str("grpc_address", e.config.UnsecureGRPCListenAddr).Msg("starting grpc server on address") - - l, err := net.Listen("tcp", e.config.UnsecureGRPCListenAddr) - if err != nil { - e.log.Err(err).Msg("failed to start the grpc server") - ctx.Throw(err) - return - } - - // save the actual address on which we are listening (may be different from e.config.UnsecureGRPCListenAddr if not port - // was specified) - e.addrLock.Lock() - e.unsecureGrpcAddress = l.Addr() - e.addrLock.Unlock() - e.log.Debug().Str("unsecure_grpc_address", e.unsecureGrpcAddress.String()).Msg("listening on port") - ready() - - err = e.unsecureGrpcServer.Serve(l) // blocking call - if err != nil { - e.log.Err(err).Msg("fatal error in unsecure grpc server") - ctx.Throw(err) - } -} - -// serveSecureGRPCWorker is a worker routine which starts the secure gRPC server. -// The ready callback is called after the server address is bound and set. -func (e *Engine) serveSecureGRPCWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - e.log.Info().Str("secure_grpc_address", e.config.SecureGRPCListenAddr).Msg("starting grpc server on address") - - l, err := net.Listen("tcp", e.config.SecureGRPCListenAddr) - if err != nil { - e.log.Err(err).Msg("failed to start the grpc server") - ctx.Throw(err) - return - } - - e.addrLock.Lock() - e.secureGrpcAddress = l.Addr() - e.addrLock.Unlock() - - e.log.Debug().Str("secure_grpc_address", e.secureGrpcAddress.String()).Msg("listening on port") - ready() - - err = e.secureGrpcServer.Serve(l) // blocking call - if err != nil { - e.log.Err(err).Msg("fatal error in secure grpc server") - ctx.Throw(err) - } -} - // serveGRPCWebProxyWorker is a worker routine which starts the gRPC web proxy server. func (e *Engine) serveGRPCWebProxyWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { log := e.log.With().Str("http_proxy_address", e.config.HTTPListenAddr).Logger() diff --git a/engine/access/rpc/engine_builder.go b/engine/access/rpc/engine_builder.go index a4694547b03..a2630d26df8 100644 --- a/engine/access/rpc/engine_builder.go +++ b/engine/access/rpc/engine_builder.go @@ -68,11 +68,11 @@ func (builder *RPCEngineBuilder) WithNewHandler(handler accessproto.AccessAPISer func (builder *RPCEngineBuilder) WithLegacy() *RPCEngineBuilder { // Register legacy gRPC handlers for backwards compatibility, to be removed at a later date legacyaccessproto.RegisterAccessAPIServer( - builder.unsecureGrpcServer, + builder.unsecureGrpcServer.Server, legacyaccess.NewHandler(builder.backend, builder.chain), ) legacyaccessproto.RegisterAccessAPIServer( - builder.secureGrpcServer, + builder.secureGrpcServer.Server, legacyaccess.NewHandler(builder.backend, builder.chain), ) return builder @@ -83,8 +83,8 @@ func (builder *RPCEngineBuilder) WithLegacy() *RPCEngineBuilder { func (builder *RPCEngineBuilder) WithMetrics() *RPCEngineBuilder { // Not interested in legacy metrics, so initialize here grpc_prometheus.EnableHandlingTimeHistogram() - grpc_prometheus.Register(builder.unsecureGrpcServer) - grpc_prometheus.Register(builder.secureGrpcServer) + grpc_prometheus.Register(builder.unsecureGrpcServer.Server) + grpc_prometheus.Register(builder.secureGrpcServer.Server) return builder } @@ -100,7 +100,7 @@ func (builder *RPCEngineBuilder) Build() (*Engine, error) { handler = access.NewHandler(builder.Engine.backend, builder.Engine.chain, builder.finalizedHeaderCache, builder.me, access.WithBlockSignerDecoder(builder.signerIndicesDecoder)) } } - accessproto.RegisterAccessAPIServer(builder.unsecureGrpcServer, handler) - accessproto.RegisterAccessAPIServer(builder.secureGrpcServer, handler) + accessproto.RegisterAccessAPIServer(builder.unsecureGrpcServer.Server, handler) + accessproto.RegisterAccessAPIServer(builder.secureGrpcServer.Server, handler) return builder.Engine, nil } diff --git a/engine/access/rpc/rate_limit_test.go b/engine/access/rpc/rate_limit_test.go index 3cce6e97fda..3fce4aeded0 100644 --- a/engine/access/rpc/rate_limit_test.go +++ b/engine/access/rpc/rate_limit_test.go @@ -16,11 +16,13 @@ import ( "github.com/stretchr/testify/suite" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" accessmock "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" module "github.com/onflow/flow-go/module/mock" @@ -61,6 +63,10 @@ type RateLimitTestSuite struct { ctx irrecoverable.SignalerContext cancel context.CancelFunc + + // grpc servers + secureGrpcServer *grpcserver.GrpcServer + unsecureGrpcServer *grpcserver.GrpcServer } func (suite *RateLimitTestSuite) SetupTest() { @@ -101,6 +107,14 @@ func (suite *RateLimitTestSuite) SetupTest() { HTTPListenAddr: unittest.DefaultAddress, } + // generate a server certificate that will be served by the GRPC server + networkingKey := unittest.NetworkingPrivKeyFixture() + x509Certificate, err := grpcutils.X509Certificate(networkingKey) + assert.NoError(suite.T(), err) + tlsConfig := grpcutils.DefaultServerTLSConfig(x509Certificate) + // set the transport credentials for the server to use + config.TransportCredentials = credentials.NewTLS(tlsConfig) + // set the rate limit to test with suite.rateLimit = 2 // set the burst limit to test with @@ -114,21 +128,45 @@ func (suite *RateLimitTestSuite) SetupTest() { "Ping": suite.rateLimit, } + suite.secureGrpcServer = grpcserver.NewGrpcServerBuilder(suite.log, + config.SecureGRPCListenAddr, + grpcutils.DefaultMaxMsgSize, + false, + apiRateLimt, + apiBurstLimt, + grpcserver.WithTransportCredentials(config.TransportCredentials)).Build() + + suite.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(suite.log, + config.UnsecureGRPCListenAddr, + grpcutils.DefaultMaxMsgSize, + false, + apiRateLimt, + apiBurstLimt).Build() + block := unittest.BlockHeaderFixture() suite.snapshot.On("Head").Return(block, nil) rpcEngBuilder, err := NewBuilder(suite.log, suite.state, config, suite.collClient, nil, suite.blocks, suite.headers, suite.collections, suite.transactions, nil, - nil, suite.chainID, suite.metrics, 0, 0, false, false, apiRateLimt, apiBurstLimt, suite.me) + nil, suite.chainID, suite.metrics, 0, 0, false, false, suite.me, suite.secureGrpcServer, suite.unsecureGrpcServer) require.NoError(suite.T(), err) suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build() require.NoError(suite.T(), err) suite.ctx, suite.cancel = irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background()) + suite.rpcEng.Start(suite.ctx) - // wait for the server to startup + + suite.secureGrpcServer.Start(suite.ctx) + suite.unsecureGrpcServer.Start(suite.ctx) + + // wait for the servers to startup + unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Ready(), 2*time.Second) + unittest.AssertClosesBefore(suite.T(), suite.unsecureGrpcServer.Ready(), 2*time.Second) + + // wait for the engine to startup unittest.RequireCloseBefore(suite.T(), suite.rpcEng.Ready(), 2*time.Second, "engine not ready at startup") // create the access api client - suite.client, suite.closer, err = accessAPIClient(suite.rpcEng.UnsecureGRPCAddress().String()) + suite.client, suite.closer, err = accessAPIClient(suite.unsecureGrpcServer.GRPCAddress().String()) require.NoError(suite.T(), err) } @@ -140,8 +178,9 @@ func (suite *RateLimitTestSuite) TearDownTest() { if suite.closer != nil { suite.closer.Close() } - // close the server - unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Done(), 2*time.Second) + // close servers + unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Done(), 2*time.Second) + unittest.AssertClosesBefore(suite.T(), suite.unsecureGrpcServer.Done(), 2*time.Second) } func TestRateLimit(t *testing.T) { diff --git a/engine/access/secure_grpcr_test.go b/engine/access/secure_grpcr_test.go index b82160668db..82fbfa0bd5a 100644 --- a/engine/access/secure_grpcr_test.go +++ b/engine/access/secure_grpcr_test.go @@ -7,18 +7,21 @@ import ( "testing" "time" - accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + + accessproto "github.com/onflow/flow/protobuf/go/flow/access" "github.com/onflow/flow-go/crypto" accessmock "github.com/onflow/flow-go/engine/access/mock" "github.com/onflow/flow-go/engine/access/rpc" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" module "github.com/onflow/flow-go/module/mock" @@ -55,6 +58,10 @@ type SecureGRPCTestSuite struct { ctx irrecoverable.SignalerContext cancel context.CancelFunc + + // grpc servers + secureGrpcServer *grpcserver.GrpcServer + unsecureGrpcServer *grpcserver.GrpcServer } func (suite *SecureGRPCTestSuite) SetupTest() { @@ -105,6 +112,21 @@ func (suite *SecureGRPCTestSuite) SetupTest() { // save the public key to use later in tests later suite.publicKey = networkingKey.PublicKey() + suite.secureGrpcServer = grpcserver.NewGrpcServerBuilder(suite.log, + config.SecureGRPCListenAddr, + grpcutils.DefaultMaxMsgSize, + false, + nil, + nil, + grpcserver.WithTransportCredentials(config.TransportCredentials)).Build() + + suite.unsecureGrpcServer = grpcserver.NewGrpcServerBuilder(suite.log, + config.UnsecureGRPCListenAddr, + grpcutils.DefaultMaxMsgSize, + false, + nil, + nil).Build() + block := unittest.BlockHeaderFixture() suite.snapshot.On("Head").Return(block, nil) @@ -126,21 +148,32 @@ func (suite *SecureGRPCTestSuite) SetupTest() { 0, false, false, - nil, - nil, suite.me, + suite.secureGrpcServer, + suite.unsecureGrpcServer, ) assert.NoError(suite.T(), err) suite.rpcEng, err = rpcEngBuilder.WithLegacy().Build() assert.NoError(suite.T(), err) suite.ctx, suite.cancel = irrecoverable.NewMockSignalerContextWithCancel(suite.T(), context.Background()) + suite.rpcEng.Start(suite.ctx) - // wait for the server to startup + + suite.secureGrpcServer.Start(suite.ctx) + suite.unsecureGrpcServer.Start(suite.ctx) + + // wait for the servers to startup + unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Ready(), 2*time.Second) + unittest.AssertClosesBefore(suite.T(), suite.unsecureGrpcServer.Ready(), 2*time.Second) + + // wait for the engine to startup unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Ready(), 2*time.Second) } func (suite *SecureGRPCTestSuite) TearDownTest() { suite.cancel() + unittest.AssertClosesBefore(suite.T(), suite.secureGrpcServer.Done(), 2*time.Second) + unittest.AssertClosesBefore(suite.T(), suite.unsecureGrpcServer.Done(), 2*time.Second) unittest.AssertClosesBefore(suite.T(), suite.rpcEng.Done(), 2*time.Second) } @@ -172,6 +205,19 @@ func (suite *SecureGRPCTestSuite) TestAPICallUsingSecureGRPC() { _, err := client.Ping(ctx, req) assert.Error(suite.T(), err) }) + + suite.Run("happy path - connection fails, unsecure client can not get info from secure server connection", func() { + conn, err := grpc.Dial( + suite.secureGrpcServer.GRPCAddress().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + assert.NoError(suite.T(), err) + + client := accessproto.NewAccessAPIClient(conn) + closer := io.Closer(conn) + defer closer.Close() + + _, err = client.Ping(ctx, req) + assert.Error(suite.T(), err) + }) } // secureGRPCClient creates a secure GRPC client using the given public key @@ -180,7 +226,7 @@ func (suite *SecureGRPCTestSuite) secureGRPCClient(publicKey crypto.PublicKey) ( assert.NoError(suite.T(), err) conn, err := grpc.Dial( - suite.rpcEng.SecureGRPCAddress().String(), + suite.secureGrpcServer.GRPCAddress().String(), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) assert.NoError(suite.T(), err) diff --git a/engine/access/state_stream/engine.go b/engine/access/state_stream/engine.go index 7b50c2e3ff8..cb3a3e73813 100644 --- a/engine/access/state_stream/engine.go +++ b/engine/access/state_stream/engine.go @@ -2,20 +2,17 @@ package state_stream import ( "fmt" - "net" "time" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" access "github.com/onflow/flow/protobuf/go/flow/executiondata" "github.com/rs/zerolog" - "google.golang.org/grpc" "github.com/onflow/flow-go/engine" - "github.com/onflow/flow-go/engine/common/rpc" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/executiondatasync/execution_data" "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" + "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/state/protocol" "github.com/onflow/flow-go/storage" @@ -61,7 +58,6 @@ type Engine struct { *component.ComponentManager log zerolog.Logger backend *StateStreamBackend - server *grpc.Server config Config chain flow.Chain handler *Handler @@ -69,8 +65,6 @@ type Engine struct { execDataBroadcaster *engine.Broadcaster execDataCache *cache.ExecutionDataCache headers storage.Headers - - stateStreamGrpcAddress net.Addr } // NewEng returns a new ingress server. @@ -86,45 +80,10 @@ func NewEng( chainID flow.ChainID, initialBlockHeight uint64, highestBlockHeight uint64, - apiRatelimits map[string]int, // the api rate limit (max calls per second) for each of the gRPC API e.g. Ping->100, GetExecutionDataByBlockID->300 - apiBurstLimits map[string]int, // the api burst limit (max calls at the same time) for each of the gRPC API e.g. Ping->50, GetExecutionDataByBlockID->10 + server *grpcserver.GrpcServer, ) (*Engine, error) { logger := log.With().Str("engine", "state_stream_rpc").Logger() - // create a GRPC server to serve GRPC clients - grpcOpts := []grpc.ServerOption{ - grpc.MaxRecvMsgSize(int(config.MaxExecutionDataMsgSize)), - grpc.MaxSendMsgSize(int(config.MaxExecutionDataMsgSize)), - } - - // ordered list of interceptors - var unaryInterceptors []grpc.UnaryServerInterceptor - - // if rpc metrics is enabled, add the grpc metrics interceptor as a server option - if config.RpcMetricsEnabled { - unaryInterceptors = append(unaryInterceptors, grpc_prometheus.UnaryServerInterceptor) - - // note: intentionally not adding logging or rate limit interceptors for streams. - // rate limiting is done in the handler, and we don't need log events for every message as - // that would be too noisy. - grpcOpts = append(grpcOpts, grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor)) - } - - if len(apiRatelimits) > 0 { - // create a rate limit interceptor - rateLimitInterceptor := rpc.NewRateLimiterInterceptor(log, apiRatelimits, apiBurstLimits).UnaryServerInterceptor - // append the rate limit interceptor to the list of interceptors - unaryInterceptors = append(unaryInterceptors, rateLimitInterceptor) - } - - // add the logging interceptor, ensure it is innermost wrapper - unaryInterceptors = append(unaryInterceptors, rpc.LoggingInterceptor(log)...) - - // create a chained unary interceptor - grpcOpts = append(grpcOpts, grpc.ChainUnaryInterceptor(unaryInterceptors...)) - - server := grpc.NewServer(grpcOpts...) - broadcaster := engine.NewBroadcaster() backend, err := New( @@ -147,7 +106,6 @@ func NewEng( e := &Engine{ log: logger, backend: backend, - server: server, headers: headers, chain: chainID.Chain(), config: config, @@ -157,10 +115,13 @@ func NewEng( } e.ComponentManager = component.NewComponentManagerBuilder(). - AddWorker(e.serve). + AddWorker(func(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + <-server.Done() + }). Build() - access.RegisterExecutionDataAPIServer(e.server, e.handler) + access.RegisterExecutionDataAPIServer(server.Server, e.handler) return e, nil } @@ -191,27 +152,3 @@ func (e *Engine) OnExecutionData(executionData *execution_data.BlockExecutionDat e.execDataBroadcaster.Publish() } - -// serve starts the gRPC server. -// When this function returns, the server is considered ready. -func (e *Engine) serve(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { - e.log.Info().Str("state_stream_address", e.config.ListenAddr).Msg("starting grpc server on address") - l, err := net.Listen("tcp", e.config.ListenAddr) - if err != nil { - ctx.Throw(fmt.Errorf("error starting grpc server: %w", err)) - } - - e.stateStreamGrpcAddress = l.Addr() - e.log.Debug().Str("state_stream_address", e.stateStreamGrpcAddress.String()).Msg("listening on port") - - go func() { - ready() - err = e.server.Serve(l) - if err != nil { - ctx.Throw(fmt.Errorf("error trying to serve grpc server: %w", err)) - } - }() - - <-ctx.Done() - e.server.GracefulStop() -} diff --git a/module/grpcserver/server.go b/module/grpcserver/server.go new file mode 100644 index 00000000000..309cb9315f2 --- /dev/null +++ b/module/grpcserver/server.go @@ -0,0 +1,88 @@ +package grpcserver + +import ( + "net" + "sync" + + "github.com/rs/zerolog" + + "google.golang.org/grpc" + + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" +) + +// GrpcServer wraps `grpc.Server` and allows to manage it using `component.Component` interface. It can be injected +// into different engines making it possible to use single grpc server for multiple services which live in different modules. +type GrpcServer struct { + component.Component + log zerolog.Logger + Server *grpc.Server + + grpcListenAddr string // the GRPC server address as ip:port + + addrLock sync.RWMutex + grpcAddress net.Addr +} + +var _ component.Component = (*GrpcServer)(nil) + +// NewGrpcServer returns a new grpc server. +func NewGrpcServer(log zerolog.Logger, + grpcListenAddr string, + grpcServer *grpc.Server, +) *GrpcServer { + server := &GrpcServer{ + log: log, + Server: grpcServer, + grpcListenAddr: grpcListenAddr, + } + server.Component = component.NewComponentManagerBuilder(). + AddWorker(server.serveGRPCWorker). + AddWorker(server.shutdownWorker). + Build() + return server +} + +// serveGRPCWorker is a worker routine which starts the gRPC server. +// The ready callback is called after the server address is bound and set. +func (g *GrpcServer) serveGRPCWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + g.log = g.log.With().Str("grpc_address", g.grpcListenAddr).Logger() + g.log.Info().Msg("starting grpc server on address") + + l, err := net.Listen("tcp", g.grpcListenAddr) + if err != nil { + g.log.Err(err).Msg("failed to start the grpc server") + ctx.Throw(err) + return + } + + // save the actual address on which we are listening (may be different from g.config.GRPCListenAddr if not port + // was specified) + g.addrLock.Lock() + g.grpcAddress = l.Addr() + g.addrLock.Unlock() + g.log.Debug().Msg("listening on port") + ready() + + err = g.Server.Serve(l) // blocking call + if err != nil { + g.log.Err(err).Msg("fatal error in grpc server") + ctx.Throw(err) + } +} + +// GRPCAddress returns the listen address of the GRPC server. +// Guaranteed to be non-nil after Engine.Ready is closed. +func (g *GrpcServer) GRPCAddress() net.Addr { + g.addrLock.RLock() + defer g.addrLock.RUnlock() + return g.grpcAddress +} + +// shutdownWorker is a worker routine which shuts down server when the context is cancelled. +func (g *GrpcServer) shutdownWorker(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { + ready() + <-ctx.Done() + g.Server.GracefulStop() +} diff --git a/module/grpcserver/server_builder.go b/module/grpcserver/server_builder.go new file mode 100644 index 00000000000..d42196cdf12 --- /dev/null +++ b/module/grpcserver/server_builder.go @@ -0,0 +1,107 @@ +package grpcserver + +import ( + "github.com/rs/zerolog" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + + "github.com/onflow/flow-go/engine/common/rpc" +) + +type Option func(*GrpcServerBuilder) + +// WithTransportCredentials sets the transport credentials parameters for a grpc server builder. +func WithTransportCredentials(transportCredentials credentials.TransportCredentials) Option { + return func(c *GrpcServerBuilder) { + c.transportCredentials = transportCredentials + } +} + +// WithStreamInterceptor sets the StreamInterceptor option to grpc server. +func WithStreamInterceptor() Option { + return func(c *GrpcServerBuilder) { + c.stateStreamInterceptorEnable = true + } +} + +// GrpcServerBuilder created for separating the creation and starting GrpcServer, +// cause services need to be registered before the server starts. +type GrpcServerBuilder struct { + log zerolog.Logger + gRPCListenAddr string + server *grpc.Server + + transportCredentials credentials.TransportCredentials // the GRPC credentials + stateStreamInterceptorEnable bool +} + +// NewGrpcServerBuilder helps to build a new grpc server. +func NewGrpcServerBuilder(log zerolog.Logger, + gRPCListenAddr string, + maxMsgSize uint, + rpcMetricsEnabled bool, + apiRateLimits map[string]int, // the api rate limit (max calls per second) for each of the Access API e.g. Ping->100, GetTransaction->300 + apiBurstLimits map[string]int, // the api burst limit (max calls at the same time) for each of the Access API e.g. Ping->50, GetTransaction->10 + opts ...Option, +) *GrpcServerBuilder { + log = log.With().Str("component", "grpc_server").Logger() + + grpcServerBuilder := &GrpcServerBuilder{ + gRPCListenAddr: gRPCListenAddr, + } + + for _, applyOption := range opts { + applyOption(grpcServerBuilder) + } + + // create a GRPC server to serve GRPC clients + grpcOpts := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(int(maxMsgSize)), + grpc.MaxSendMsgSize(int(maxMsgSize)), + } + var interceptors []grpc.UnaryServerInterceptor // ordered list of interceptors + // if rpc metrics is enabled, first create the grpc metrics interceptor + if rpcMetricsEnabled { + interceptors = append(interceptors, grpc_prometheus.UnaryServerInterceptor) + + if grpcServerBuilder.stateStreamInterceptorEnable { + // note: intentionally not adding logging or rate limit interceptors for streams. + // rate limiting is done in the handler, and we don't need log events for every message as + // that would be too noisy. + log.Info().Msg("stateStreamInterceptorEnable true") + grpcOpts = append(grpcOpts, grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor)) + } else { + log.Info().Msg("stateStreamInterceptorEnable false") + } + } + if len(apiRateLimits) > 0 { + // create a rate limit interceptor + rateLimitInterceptor := rpc.NewRateLimiterInterceptor(log, apiRateLimits, apiBurstLimits).UnaryServerInterceptor + // append the rate limit interceptor to the list of interceptors + interceptors = append(interceptors, rateLimitInterceptor) + } + // add the logging interceptor, ensure it is innermost wrapper + interceptors = append(interceptors, rpc.LoggingInterceptor(log)...) + // create a chained unary interceptor + // create an unsecured grpc server + grpcOpts = append(grpcOpts, grpc.ChainUnaryInterceptor(interceptors...)) + + if grpcServerBuilder.transportCredentials != nil { + log = log.With().Str("endpoint", "secure").Logger() + // create a secure server by using the secure grpc credentials that are passed in as part of config + grpcOpts = append(grpcOpts, grpc.Creds(grpcServerBuilder.transportCredentials)) + } else { + log = log.With().Str("endpoint", "unsecure").Logger() + } + grpcServerBuilder.log = log + grpcServerBuilder.server = grpc.NewServer(grpcOpts...) + + return grpcServerBuilder +} + +func (b *GrpcServerBuilder) Build() *GrpcServer { + return NewGrpcServer(b.log, b.gRPCListenAddr, b.server) +}