Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make gRPC requests go through tendermint Query #8549

Merged
merged 19 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ func (app *BaseApp) InitChain(req abci.RequestInitChain) (res abci.ResponseInitC
app.StoreConsensusParams(app.deliverState.ctx, req.ConsensusParams)
}

app.grpcQueryRouter.dryRunMethodHandlers(app.checkState.ctx)
Copy link
Contributor Author

@amaury1093 amaury1093 Feb 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status update: as per #8549 (comment), I added a returnType map[string]reflect.Type which maps each FQ method name to its return type. This map get populated when calling dryRunMethodHandlers, but i'm not sure from where to call it.

(Note: to dry run the method handlers, we need a context.Context, and I'm passing a sdk.WrapSDKContext(ctx) right now)

I tried:

  • populating returnTypes immediately on RegisterService (so that we wouldn't even need a separate dryRunMethodHandlers function) using a dummy sdk.Context. But it panicked on some nil stores.
  • calling dryRunMethodHandlers in InitChain, panicking for unmarshalling empty bytes request.

I'm thinking of a way to avoid panicking on all grpc query handlers. Maybe there's a way to do it with context.CancelFunc instead of sdk.WrapSDKContext(someSdkCtx)? Some other ideas?

Copy link
Member

@aaronc aaronc Feb 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can pre-run these in RegisterService I think. I'll take a quick look

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually looks pretty nasty... I remember we had this issue before.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually if this is all the same process we can just capture the return type the first time a real query runs. I know that's super hacky but it should work unless someone is running grpc stand-alone.

A more correct way would be - read grpc schema from message descriptors, then look up the return type name, then look up the actual go type from the proto type registry.

What do you think?

Copy link
Contributor Author

@amaury1093 amaury1093 Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grpc reflection way also seems more correct to me, however:

then look up the return type name

I can't seem to find where to find the mapping of fq method name to return type (in grpc-go code)

Actually if this is all the same process we can just capture the return type the first time a real query runs. I know that's super hacky but it should work unless someone is running grpc stand-alone.

For now I pushed this version

Copy link
Member

@aaronc aaronc Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get the file descriptor with proto.FileDescriptor and the go type with proto.MessageType. Does that help? This involves actually parsing a FileDescriptor...

Copy link
Contributor Author

@amaury1093 amaury1093 Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This involves actually parsing a FileDescriptor

Ah right. I would need to define some heuristics to guess the file name from the fq method name, right? Will try that, but seems hacky too...

Copy link
Contributor Author

@amaury1093 amaury1093 Feb 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'll be hacky, we have e.g. FileDescriptor for cosmos/base/reflection/v1beta1/reflection.proto, and there's no way we can guess the file path from the fq service name (e.g. cosmos.base.reflection.v1beta1.ReflectionService), unless introducing a lot of ifs.

Maybe let's just stay with capturing the return type on 1st call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have access to a reflection grpc service then we can actually just use that to get the descriptors. But honestly hopefully this is just a short term fix and we deal with the concurrency issues soon. I would hate for the whole abci pipeline to be forever single threaded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have access to a reflection grpc service then we can actually just use that to get the descriptors.

We'd need a client for that, it seems overkill. i also remember @clevinson running into issues with reflection because we're using gogoproto.

anyways, the current approach should be good enough for now.


if app.initChainer == nil {
return
}
Expand Down
24 changes: 23 additions & 1 deletion baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"fmt"
"reflect"

gogogrpc "github.com/gogo/protobuf/grpc"
abci "github.com/tendermint/tendermint/abci/types"
Expand All @@ -19,6 +20,7 @@ var protoCodec = encoding.GetCodec(proto.Name)
// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
returnTypes map[string]reflect.Type // map of a FQ method name to its return type.
amaury1093 marked this conversation as resolved.
Show resolved Hide resolved
interfaceRegistry codectypes.InterfaceRegistry
serviceData []serviceData
}
Expand All @@ -34,7 +36,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{},
returnTypes: map[string]reflect.Type{},
routes: map[string]GRPCQueryHandler{},
}
}

Expand Down Expand Up @@ -82,6 +85,13 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
// call the method handler from the service description with the handler object,
// a wrapped sdk.Context with proto-unmarshaled data from the ABCI request data
res, err := methodHandler(handler, sdk.WrapSDKContext(ctx), func(i interface{}) error {
// If it's the first time we call this handler, then we save
// the return type of the handler in the `returnTypes` map.
// The return type will be used for decoding subsequent requests.
if _, found := qrt.returnTypes[fqName]; !found {
qrt.returnTypes[fqName] = reflect.TypeOf(i)
}

err := protoCodec.Unmarshal(req.Data, i)
if err != nil {
return err
Expand Down Expand Up @@ -115,6 +125,18 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
})
}

// dryRunMethodHandlers runs all methods saved in `routes` once. This should
// be called once at startup (after the stores are mounted), to populate the
// `returnTypes` map. It is called from InitChain.
func (qrt *GRPCQueryRouter) dryRunMethodHandlers(ctx sdk.Context) {
for fqName, handler := range qrt.routes {
fmt.Println("dry running fqNanem", fqName)
handler(ctx, abci.RequestQuery{
Path: fqName,
})
}
}

// SetInterfaceRegistry sets the interface registry for the router. This will
// also register the interface reflection gRPC service.
func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.InterfaceRegistry) {
Expand Down
44 changes: 28 additions & 16 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package baseapp

import (
"context"
"reflect"
"strconv"

gogogrpc "github.com/gogo/protobuf/grpc"
"github.com/gogo/protobuf/proto"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
abci "github.com/tendermint/tendermint/abci/types"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/client"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
)
Expand All @@ -21,10 +24,10 @@ import (
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (resp interface{}, err error) {
// If there's some metadata in the context, retrieve it.
md, ok := metadata.FromIncomingContext(grpcCtx)
if !ok {
Expand All @@ -38,31 +41,40 @@ func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
if err != nil {
return nil, sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
"baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
}
if err := checkNegativeHeight(height); err != nil {
return nil, err
}
}

// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
sdkCtx, err := app.createQueryContext(height, false)
reqProto, ok := req.(proto.Message)
if !ok {
return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (proto.Message)(nil), req)
}
reqBz, err := proto.Marshal(reqProto)
if err != nil {
return nil, err
return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "cannot proto marshal: %v", err)
}

// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)
abciReq := abci.RequestQuery{
Path: info.FullMethod,
Data: reqBz,
Height: height,
Prove: false, // Enable here: https://github.com/cosmos/cosmos-sdk/issues/7036
}
abciRes, err := clientCtx.QueryABCI(abciReq)
if err != nil {
return nil, err
}

// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
res := reflect.New(app.GRPCQueryRouter().returnTypes[info.FullMethod]).Elem().Interface()
err = protoCodec.Unmarshal(abciRes.Value, res)
if err != nil {
return nil, err
}
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
grpc.SetHeader(grpcCtx, md)

return handler(grpcCtx, req)
return
}

// Loop through all services and methods, add the interceptor, and register
Expand Down
5 changes: 3 additions & 2 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/server/types"
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) {
func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) {
grpcSrv := grpc.NewServer()
app.RegisterGRPCServer(grpcSrv)
app.RegisterGRPCServer(clientCtx, grpcSrv)

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
Expand Down
2 changes: 1 addition & 1 deletion server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
grpcWebSrv *http.Server
)
if config.GRPC.Enable {
grpcSrv, err = servergrpc.StartGRPCServer(app, config.GRPC.Address)
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC.Address)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type (

// RegisterGRPCServer registers gRPC services directly with the gRPC
// server.
RegisterGRPCServer(grpc.Server)
RegisterGRPCServer(client.Context, grpc.Server)

// RegisterTxService registers the gRPC Query service for tx (such as tx
// simulation, fetching txs by hash...).
Expand Down
2 changes: 1 addition & 1 deletion testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func startInProcess(cfg Config, val *Validator) error {
}

if val.AppConfig.GRPC.Enable {
grpcSrv, err := servergrpc.StartGRPCServer(app, val.AppConfig.GRPC.Address)
grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC.Address)
if err != nil {
return err
}
Expand Down