Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make gRPC requests go through tendermint Query #8549

Merged
merged 19 commits into from
Feb 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions baseapp/grpcrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"fmt"
"reflect"

gogogrpc "github.com/gogo/protobuf/grpc"
abci "github.com/tendermint/tendermint/abci/types"
Expand All @@ -12,13 +13,19 @@ import (
"github.com/cosmos/cosmos-sdk/client/grpc/reflection"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
)

var protoCodec = encoding.GetCodec(proto.Name)

// GRPCQueryRouter routes ABCI Query requests to GRPC handlers
type GRPCQueryRouter struct {
routes map[string]GRPCQueryHandler
routes map[string]GRPCQueryHandler
// returnTypes is a map of FQ method name => its return type. It is used
// for cache purposes: the first time a method handler is run, we save its
// return type in this map. Then, on subsequent method handler calls, we
// decode the ABCI response bytes using the cached return type.
returnTypes map[string]reflect.Type
interfaceRegistry codectypes.InterfaceRegistry
serviceData []serviceData
}
Expand All @@ -34,7 +41,8 @@ var _ gogogrpc.Server = &GRPCQueryRouter{}
// NewGRPCQueryRouter creates a new GRPCQueryRouter
func NewGRPCQueryRouter() *GRPCQueryRouter {
return &GRPCQueryRouter{
routes: map[string]GRPCQueryHandler{},
returnTypes: map[string]reflect.Type{},
routes: map[string]GRPCQueryHandler{},
}
}

Expand Down Expand Up @@ -89,8 +97,17 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf
if qrt.interfaceRegistry != nil {
return codectypes.UnpackInterfaces(i, qrt.interfaceRegistry)
}

return nil
}, nil)

// If it's the first time we call this handler, then we save
// the return type of the handler in the `returnTypes` map.
// The return type will be used for decoding subsequent requests.
if _, found := qrt.returnTypes[fqName]; !found {
qrt.returnTypes[fqName] = reflect.TypeOf(res)
}

if err != nil {
return abci.ResponseQuery{}, err
}
Expand Down Expand Up @@ -127,3 +144,16 @@ func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.In
reflection.NewReflectionServiceServer(interfaceRegistry),
)
}

// returnTypeOf returns the return type of a gRPC method handler. With the way the
// `returnTypes` cache map is set up, the return type of a method handler is
// guaranteed to be found if it's retrieved **after** the method handler ran at
// least once. If not, then a logic error is return.
func (qrt *GRPCQueryRouter) returnTypeOf(method string) (reflect.Type, error) {
returnType, found := qrt.returnTypes[method]
if !found {
return nil, sdkerrors.Wrapf(sdkerrors.ErrLogic, "cannot find %s return type", method)
}

return returnType, nil
}
83 changes: 47 additions & 36 deletions baseapp/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,67 +2,78 @@ package baseapp

import (
"context"
"strconv"
"reflect"

gogogrpc "github.com/gogo/protobuf/grpc"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpcrecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/client"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/types/tx"
)

// GRPCQueryRouter returns the GRPCQueryRouter of a BaseApp.
func (app *BaseApp) GRPCQueryRouter() *GRPCQueryRouter { return app.grpcQueryRouter }

// RegisterGRPCServer registers gRPC services directly with the gRPC server.
func (app *BaseApp) RegisterGRPCServer(server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will create
// a new sdk.Context, and pass it into the query handler.
interceptor := func(grpcCtx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// If there's some metadata in the context, retrieve it.
md, ok := metadata.FromIncomingContext(grpcCtx)
if !ok {
return nil, status.Error(codes.Internal, "unable to retrieve metadata")
}
func (app *BaseApp) RegisterGRPCServer(clientCtx client.Context, server gogogrpc.Server) {
// Define an interceptor for all gRPC queries: this interceptor will route
// the query through the `clientCtx`, which itself queries Tendermint.
interceptor := func(grpcCtx context.Context, req interface{}, info *grpc.UnaryServerInfo, _ grpc.UnaryHandler) (interface{}, error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which case we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// Get height header from the request context, if present.
var height int64
if heightHeaders := md.Get(grpctypes.GRPCBlockHeightHeader); len(heightHeaders) > 0 {
height, err = strconv.ParseInt(heightHeaders[0], 10, 64)
if err != nil {
return nil, sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"Baseapp.RegisterGRPCServer: invalid height header %q: %v", grpctypes.GRPCBlockHeightHeader, err)
}
if err := checkNegativeHeight(height); err != nil {
return nil, err
// Case 1. Broadcasting a Tx.
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if !ok {
return nil, sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
}

return client.TxServiceBroadcast(grpcCtx, clientCtx, reqProto)
}

// Case 2. Querying state.
inMd, _ := metadata.FromIncomingContext(grpcCtx)
abciRes, outMd, err := client.RunGRPCQuery(clientCtx, grpcCtx, info.FullMethod, req, inMd)
if err != nil {
return nil, err
}

// Create the sdk.Context. Passing false as 2nd arg, as we can't
// actually support proofs with gRPC right now.
sdkCtx, err := app.createQueryContext(height, false)
// We need to know the return type of the grpc method for
// unmarshalling abciRes.Value.
//
// When we call each method handler for the first time, we save its
// return type in the `returnTypes` map (see the method handler in
// `grpcrouter.go`). By this time, the method handler has already run
// at least once (in the RunGRPCQuery call), so we're sure the
// returnType maps is populated for this method. We're retrieving it
// for decoding.
returnType, err := app.GRPCQueryRouter().returnTypeOf(info.FullMethod)
if err != nil {
return nil, err
}

// Attach the sdk.Context into the gRPC's context.Context.
grpcCtx = context.WithValue(grpcCtx, sdk.SdkContextKey, sdkCtx)
// returnType is a pointer to a struct. Here, we're creating res which
// is a new pointer to the underlying struct.
res := reflect.New(returnType.Elem()).Interface()

// Add relevant gRPC headers
if height == 0 {
height = sdkCtx.BlockHeight() // If height was not set in the request, set it to the latest
err = protoCodec.Unmarshal(abciRes.Value, res)
if err != nil {
return nil, err
}

// Send the metadata header back. The metadata currently includes:
// - block height.
err = grpc.SendHeader(grpcCtx, outMd)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use SendHeader instead of SetHeader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know why before SetHeader was used?

if err != nil {
return nil, err
}
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(height, 10))
grpc.SetHeader(grpcCtx, md)

return handler(grpcCtx, req)
return res, nil
}

// Loop through all services and methods, add the interceptor, and register
Expand Down
99 changes: 54 additions & 45 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,100 +24,109 @@ var _ gogogrpc.ClientConn = Context{}
var protoCodec = encoding.GetCodec(proto.Name)

// Invoke implements the grpc ClientConn.Invoke method
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, req, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(args).IsNil() {
// In both cases, we don't allow empty request req (it will panic unexpectedly).
if reflect.ValueOf(req).IsNil() {
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
}

// Case 1. Broadcasting a Tx.
if isBroadcast(method) {
req, ok := args.(*tx.BroadcastTxRequest)
if reqProto, ok := req.(*tx.BroadcastTxRequest); ok {
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), req)
}
res, ok := reply.(*tx.BroadcastTxResponse)
resProto, ok := reply.(*tx.BroadcastTxResponse)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), req)
}

broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, reqProto)
if err != nil {
return err
}
*res = *broadcastRes
*resProto = *broadcastRes

return err
}

// Case 2. Querying state.
reqBz, err := protoCodec.Marshal(args)
inMd, _ := metadata.FromOutgoingContext(grpcCtx)
abciRes, outMd, err := RunGRPCQuery(ctx, grpcCtx, method, req, inMd)
if err != nil {
return err
}

err = protoCodec.Unmarshal(abciRes.Value, reply)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be possible to store an object in the abciRes to avoid the serialization round trip? RunGRPCQuery would need to have additional parameter: serialize bool (or thous would be set in a header). If not set, we don't serialize, and instead set the value in the response directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abciRes comes from tendermint, so not easily.

We could create a wrapper around it, for optimization (not serializing/deserializing the same structs multiple times...) as you say. But that's a bigger refactor, and if we're talking about bigger refactors, a more sensible one would be to allow concurrent gRPC queries.

if err != nil {
return err
}

for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = outMd
}

if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}

return nil
}

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}

// RunGRPCQuery runs a gRPC query from the clientCtx, given all necessary
// arguments for the gRPC method, and returns the ABCI response. It is used
// to factorize code between client (Invoke) and server (RegisterGRPCServer)
// gRPC handlers.
func RunGRPCQuery(ctx Context, grpcCtx gocontext.Context, method string, req interface{}, md metadata.MD) (abci.ResponseQuery, metadata.MD, error) {
reqBz, err := protoCodec.Marshal(req)
if err != nil {
return abci.ResponseQuery{}, nil, err
}

// parse height header
md, _ := metadata.FromOutgoingContext(grpcCtx)
if heights := md.Get(grpctypes.GRPCBlockHeightHeader); len(heights) > 0 {
height, err := strconv.ParseInt(heights[0], 10, 64)
if err != nil {
return err
return abci.ResponseQuery{}, nil, err
}
if height < 0 {
return sdkerrors.Wrapf(
return abci.ResponseQuery{}, nil, sdkerrors.Wrapf(
sdkerrors.ErrInvalidRequest,
"client.Context.Invoke: height (%d) from %q must be >= 0", height, grpctypes.GRPCBlockHeightHeader)
}

ctx = ctx.WithHeight(height)
}

req := abci.RequestQuery{
abciReq := abci.RequestQuery{
Path: method,
Data: reqBz,
}

res, err := ctx.QueryABCI(req)
abciRes, err := ctx.QueryABCI(abciReq)
if err != nil {
return err
}

err = protoCodec.Unmarshal(res.Value, reply)
if err != nil {
return err
return abci.ResponseQuery{}, nil, err
}

// Create header metadata. For now the headers contain:
// - block height
// We then parse all the call options, if the call option is a
// HeaderCallOption, then we manually set the value of that header to the
// metadata.
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(res.Height, 10))
for _, callOpt := range opts {
header, ok := callOpt.(grpc.HeaderCallOption)
if !ok {
continue
}

*header.HeaderAddr = md
}

if ctx.InterfaceRegistry != nil {
return types.UnpackInterfaces(reply, ctx.InterfaceRegistry)
}

return nil
}

// NewStream implements the grpc ClientConn.NewStream method
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}
md = metadata.Pairs(grpctypes.GRPCBlockHeightHeader, strconv.FormatInt(abciRes.Height, 10))

func isBroadcast(method string) bool {
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
return abciRes, md, nil
}
5 changes: 3 additions & 2 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/server/types"
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(app types.Application, address string) (*grpc.Server, error) {
func StartGRPCServer(clientCtx client.Context, app types.Application, address string) (*grpc.Server, error) {
grpcSrv := grpc.NewServer()
app.RegisterGRPCServer(grpcSrv)
app.RegisterGRPCServer(clientCtx, grpcSrv)

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
Expand Down
8 changes: 3 additions & 5 deletions server/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() {
grpc.Header(&header),
)
blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().Equal([]string{"1"}, blockHeight)
s.Require().NotEmpty(blockHeight[0]) // blockHeight is []string, first element is block height.
}

func (s *IntegrationTestSuite) TestGRPCServer_Reflection() {
Expand Down Expand Up @@ -124,8 +124,6 @@ func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() {
Events: []string{"message.action=send"},
},
)
// TODO Once https://github.com/cosmos/cosmos-sdk/pull/8029 is merged, this
// should not error anymore.
s.Require().NoError(err)
}

Expand Down Expand Up @@ -185,9 +183,9 @@ func (s *IntegrationTestSuite) TestGRPCServerInvalidHeaderHeights() {
value string
wantErr string
}{
{"-1", "height < 0"},
{"-1", "\"x-cosmos-block-height\" must be >= 0"},
{"9223372036854775808", "value out of range"}, // > max(int64) by 1
{"-10", "height < 0"},
{"-10", "\"x-cosmos-block-height\" must be >= 0"},
{"18446744073709551615", "value out of range"}, // max uint64, which is > max(int64)
{"-9223372036854775809", "value out of range"}, // Out of the range of for negative int64
}
Expand Down
2 changes: 1 addition & 1 deletion server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
grpcWebSrv *http.Server
)
if config.GRPC.Enable {
grpcSrv, err = servergrpc.StartGRPCServer(app, config.GRPC.Address)
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC.Address)
if err != nil {
return err
}
Expand Down
Loading