Skip to content

Commit

Permalink
Add tx broadcast gRPC endpoint (#7852)
Browse files Browse the repository at this point in the history
* WIP tx/broadcast grpc endpoint

* fix lint

* fix proto lint

* Update service.proto

* resolve conflicts

* update service.proto

* Update service.proto

* review changes

* proto lint

* Switch to txraw

* Add check breaking at the end

* Fix broadcast

* Send Msg on SetupSuite

* Remove proto-check-breaking

* 1 validator in test

* Add grpc server tests for broadcast

* Fix grpc server tests

* Add some changes

* Add ress comments

* Add table tests for tx service

* Add test for mode

* Add simulate tests

* Add build flag back

* Revert custom stringer for enum

* Remove stray logs

* Use /txs/{hash}

Co-authored-by: Amaury Martiny <amaury.martiny@protonmail.com>
Co-authored-by: Aleksandr Bezobchuk <alexanderbez@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 2, 2020
1 parent f57828c commit b6c8d5e
Show file tree
Hide file tree
Showing 12 changed files with 1,146 additions and 206 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ devdoc-update:
### Protobuf ###
###############################################################################

proto-all: proto-format proto-lint proto-check-breaking proto-gen
proto-all: proto-format proto-lint proto-gen

proto-gen:
@echo "Generating Protobuf files"
Expand Down
36 changes: 36 additions & 0 deletions client/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@ import (

"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/mempool"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cosmos/cosmos-sdk/client/flags"
sdk "github.com/cosmos/cosmos-sdk/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
"github.com/cosmos/cosmos-sdk/types/tx"
)

// BroadcastTx broadcasts a transactions either synchronously or asynchronously
Expand Down Expand Up @@ -142,3 +145,36 @@ func (ctx Context) BroadcastTxAsync(txBytes []byte) (*sdk.TxResponse, error) {

return sdk.NewResponseFormatBroadcastTx(res), err
}

// TxServiceBroadcast is a helper function to broadcast a Tx with the correct gRPC types
// from the tx service. Calls `clientCtx.BroadcastTx` under the hood.
func TxServiceBroadcast(grpcCtx context.Context, clientCtx Context, req *tx.BroadcastTxRequest) (*tx.BroadcastTxResponse, error) {
if req == nil || req.TxBytes == nil {
return nil, status.Error(codes.InvalidArgument, "invalid empty tx")
}

clientCtx = clientCtx.WithBroadcastMode(normalizeBroadcastMode(req.Mode))
resp, err := clientCtx.BroadcastTx(req.TxBytes)
if err != nil {
return nil, err
}

return &tx.BroadcastTxResponse{
TxResponse: resp,
}, nil
}

// normalizeBroadcastMode converts a broadcast mode into a normalized string
// to be passed into the clientCtx.
func normalizeBroadcastMode(mode tx.BroadcastMode) string {
switch mode {
case tx.BroadcastMode_BROADCAST_MODE_ASYNC:
return "async"
case tx.BroadcastMode_BROADCAST_MODE_BLOCK:
return "block"
case tx.BroadcastMode_BROADCAST_MODE_SYNC:
return "sync"
default:
return "unspecified"
}
}
41 changes: 38 additions & 3 deletions client/grpc_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
gocontext "context"
"fmt"
"reflect"
"strconv"

gogogrpc "github.com/gogo/protobuf/grpc"
Expand All @@ -12,18 +13,48 @@ import (
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/metadata"

grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"

"github.com/cosmos/cosmos-sdk/codec/types"
sdkerrors "github.com/cosmos/cosmos-sdk/types/errors"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/types/tx"
)

var _ gogogrpc.ClientConn = Context{}

var protoCodec = encoding.GetCodec(proto.Name)

// Invoke implements the grpc ClientConn.Invoke method
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error {
func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply interface{}, opts ...grpc.CallOption) (err error) {
// Two things can happen here:
// 1. either we're broadcasting a Tx, in which call we call Tendermint's broadcast endpoint directly,
// 2. or we are querying for state, in which case we call ABCI's Query.

// In both cases, we don't allow empty request args (it will panic unexpectedly).
if reflect.ValueOf(args).IsNil() {
return sdkerrors.Wrap(sdkerrors.ErrInvalidRequest, "request cannot be nil")
}

// Case 1. Broadcasting a Tx.
if isBroadcast(method) {
req, ok := args.(*tx.BroadcastTxRequest)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxRequest)(nil), args)
}
res, ok := reply.(*tx.BroadcastTxResponse)
if !ok {
return sdkerrors.Wrapf(sdkerrors.ErrInvalidRequest, "expected %T, got %T", (*tx.BroadcastTxResponse)(nil), args)
}

broadcastRes, err := TxServiceBroadcast(grpcCtx, ctx, req)
if err != nil {
return err
}
*res = *broadcastRes

return err
}

// Case 2. Querying state.
reqBz, err := protoCodec.Marshal(args)
if err != nil {
return err
Expand Down Expand Up @@ -86,3 +117,7 @@ func (ctx Context) Invoke(grpcCtx gocontext.Context, method string, args, reply
func (Context) NewStream(gocontext.Context, *grpc.StreamDesc, string, ...grpc.CallOption) (grpc.ClientStream, error) {
return nil, fmt.Errorf("streaming rpc not supported")
}

func isBroadcast(method string) bool {
return method == "/cosmos.tx.v1beta1.Service/BroadcastTx"
}
2 changes: 1 addition & 1 deletion docs/migrations/rest.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Some modules expose legacy `POST` endpoints to generate unsigned transactions fo

| Legacy REST Endpoint | Description | New gGPC-gateway REST Endpoint |
| ------------------------------------------------------------------------------- | ------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------- |
| `GET /txs/{hash}` | Query tx by hash | `GET /cosmos/tx/v1beta1/tx/{hash}` |
| `GET /txs/{hash}` | Query tx by hash | `GET /cosmos/tx/v1beta1/txs/{hash}` |
| `GET /txs` | Query tx by events | `GET /cosmos/tx/v1beta1/txs` |
| `POST /txs` | Broadcast tx | `POST /cosmos/tx/v1beta1/txs` |
| `POST /txs/encode` | Encodes an Amino JSON tx to an Amino binary tx | N/A, use Protobuf directly |
Expand Down
46 changes: 43 additions & 3 deletions proto/cosmos/tx/v1beta1/service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package cosmos.tx.v1beta1;
import "google/api/annotations.proto";
import "cosmos/base/abci/v1beta1/abci.proto";
import "cosmos/tx/v1beta1/tx.proto";
import "gogoproto/gogo.proto";
import "cosmos/base/query/v1beta1/pagination.proto";

option go_package = "github.com/cosmos/cosmos-sdk/types/tx";
Expand All @@ -12,13 +13,22 @@ option go_package = "github.com/cosmos/cosmos-sdk/types/tx";
service Service {
// Simulate simulates executing a transaction for estimating gas usage.
rpc Simulate(SimulateRequest) returns (SimulateResponse) {
option (google.api.http).post = "/cosmos/tx/v1beta1/simulate";
option (google.api.http) = {
post: "/cosmos/tx/v1beta1/simulate"
body: "*"
};
}
// GetTx fetches a tx by hash.
rpc GetTx(GetTxRequest) returns (GetTxResponse) {
option (google.api.http).get = "/cosmos/tx/v1beta1/tx/{hash}";
option (google.api.http).get = "/cosmos/tx/v1beta1/txs/{hash}";
}
// BroadcastTx broadcast transaction.
rpc BroadcastTx(BroadcastTxRequest) returns (BroadcastTxResponse) {
option (google.api.http) = {
post: "/cosmos/tx/v1beta1/txs"
body: "*"
};
}

// GetTxsEvent fetches txs by event.
rpc GetTxsEvent(GetTxsEventRequest) returns (GetTxsEventResponse) {
option (google.api.http).get = "/cosmos/tx/v1beta1/txs";
Expand All @@ -45,6 +55,36 @@ message GetTxsEventResponse {
cosmos.base.query.v1beta1.PageResponse pagination = 3;
}

// BroadcastTxRequest is the request type for the Service.BroadcastTxRequest
// RPC method.
message BroadcastTxRequest {
// tx_bytes is the raw transaction.
bytes tx_bytes = 1;
BroadcastMode mode = 2;
}

// BroadcastMode specifies the broadcast mode for the TxService.Broadcast RPC method.
enum BroadcastMode {
// zero-value for mode ordering
BROADCAST_MODE_UNSPECIFIED = 0;
// BROADCAST_MODE_BLOCK defines a tx broadcasting mode where the client waits for
// the tx to be committed in a block.
BROADCAST_MODE_BLOCK = 1;
// BROADCAST_MODE_SYNC defines a tx broadcasting mode where the client waits for
// a CheckTx execution response only.
BROADCAST_MODE_SYNC = 2;
// BROADCAST_MODE_ASYNC defines a tx broadcasting mode where the client returns
// immediately.
BROADCAST_MODE_ASYNC = 3;
}

// BroadcastTxResponse is the response type for the
// Service.BroadcastTx method.
message BroadcastTxResponse {
// tx_response is the queried TxResponses.
cosmos.base.abci.v1beta1.TxResponse tx_response = 1;
}

// SimulateRequest is the request type for the Service.Simulate
// RPC method.
message SimulateRequest {
Expand Down
88 changes: 73 additions & 15 deletions server/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,64 @@ import (
"google.golang.org/grpc/metadata"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"

clienttx "github.com/cosmos/cosmos-sdk/client/tx"
"github.com/cosmos/cosmos-sdk/testutil/network"
"github.com/cosmos/cosmos-sdk/testutil/testdata"
sdk "github.com/cosmos/cosmos-sdk/types"
grpctypes "github.com/cosmos/cosmos-sdk/types/grpc"
"github.com/cosmos/cosmos-sdk/types/tx"
txtypes "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/cosmos/cosmos-sdk/types/tx/signing"
authclient "github.com/cosmos/cosmos-sdk/x/auth/client"
banktypes "github.com/cosmos/cosmos-sdk/x/bank/types"
)

type IntegrationTestSuite struct {
suite.Suite

cfg network.Config
network *network.Network
conn *grpc.ClientConn
}

func (s *IntegrationTestSuite) SetupSuite() {
s.T().Log("setting up integration test suite")

s.network = network.New(s.T(), network.DefaultConfig())
s.cfg = network.DefaultConfig()
s.network = network.New(s.T(), s.cfg)
s.Require().NotNil(s.network)

_, err := s.network.WaitForHeight(2)
s.Require().NoError(err)
}

func (s *IntegrationTestSuite) TearDownSuite() {
s.T().Log("tearing down integration test suite")
s.network.Cleanup()
}

func (s *IntegrationTestSuite) TestGRPCServer() {
val0 := s.network.Validators[0]
conn, err := grpc.Dial(
s.conn, err = grpc.Dial(
val0.AppConfig.GRPC.Address,
grpc.WithInsecure(), // Or else we get "no transport security set"
)
s.Require().NoError(err)
defer conn.Close()
}

func (s *IntegrationTestSuite) TearDownSuite() {
s.T().Log("tearing down integration test suite")
s.conn.Close()
s.network.Cleanup()
}

func (s *IntegrationTestSuite) TestGRPCServer_TestService() {
// gRPC query to test service should work
testClient := testdata.NewQueryClient(conn)
testClient := testdata.NewQueryClient(s.conn)
testRes, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"})
s.Require().NoError(err)
s.Require().Equal("hello", testRes.Message)
}

func (s *IntegrationTestSuite) TestGRPCServer_BankBalance() {
val0 := s.network.Validators[0]

// gRPC query to bank service should work
denom := fmt.Sprintf("%stoken", val0.Moniker)
bankClient := banktypes.NewQueryClient(conn)
bankClient := banktypes.NewQueryClient(s.conn)
var header metadata.MD
bankRes, err := bankClient.Balance(
context.Background(),
Expand All @@ -83,9 +93,11 @@ func (s *IntegrationTestSuite) TestGRPCServer() {
)
blockHeight = header.Get(grpctypes.GRPCBlockHeightHeader)
s.Require().Equal([]string{"1"}, blockHeight)
}

func (s *IntegrationTestSuite) TestGRPCServer_Reflection() {
// Test server reflection
reflectClient := rpb.NewServerReflectionClient(conn)
reflectClient := rpb.NewServerReflectionClient(s.conn)
stream, err := reflectClient.ServerReflectionInfo(context.Background(), grpc.WaitForReady(true))
s.Require().NoError(err)
s.Require().NoError(stream.Send(&rpb.ServerReflectionRequest{
Expand All @@ -100,11 +112,13 @@ func (s *IntegrationTestSuite) TestGRPCServer() {
}
// Make sure the following services are present
s.Require().True(servicesMap["cosmos.bank.v1beta1.Query"])
}

func (s *IntegrationTestSuite) TestGRPCServer_GetTxsEvent() {
// Query the tx via gRPC without pagination. This used to panic, see
// https://github.com/cosmos/cosmos-sdk/issues/8038.
txServiceClient := txtypes.NewServiceClient(conn)
_, err = txServiceClient.GetTxsEvent(
txServiceClient := txtypes.NewServiceClient(s.conn)
_, err := txServiceClient.GetTxsEvent(
context.Background(),
&tx.GetTxsEventRequest{
Events: []string{"message.action=send"},
Expand All @@ -115,6 +129,50 @@ func (s *IntegrationTestSuite) TestGRPCServer() {
s.Require().NoError(err)
}

func (s *IntegrationTestSuite) TestGRPCServer_BroadcastTx() {
val0 := s.network.Validators[0]

// prepare txBuilder with msg
txBuilder := val0.ClientCtx.TxConfig.NewTxBuilder()
feeAmount := sdk.Coins{sdk.NewInt64Coin(s.cfg.BondDenom, 10)}
gasLimit := testdata.NewTestGasLimit()
s.Require().NoError(
txBuilder.SetMsgs(&banktypes.MsgSend{
FromAddress: val0.Address.String(),
ToAddress: val0.Address.String(),
Amount: sdk.Coins{sdk.NewInt64Coin(s.cfg.BondDenom, 10)},
}),
)
txBuilder.SetFeeAmount(feeAmount)
txBuilder.SetGasLimit(gasLimit)

// setup txFactory
txFactory := clienttx.Factory{}.
WithChainID(val0.ClientCtx.ChainID).
WithKeybase(val0.ClientCtx.Keyring).
WithTxConfig(val0.ClientCtx.TxConfig).
WithSignMode(signing.SignMode_SIGN_MODE_DIRECT)

// Sign Tx.
err := authclient.SignTx(txFactory, val0.ClientCtx, val0.Moniker, txBuilder, false)
s.Require().NoError(err)

txBytes, err := val0.ClientCtx.TxConfig.TxEncoder()(txBuilder.GetTx())
s.Require().NoError(err)

// Broadcast the tx via gRPC.
queryClient := tx.NewServiceClient(s.conn)
grpcRes, err := queryClient.BroadcastTx(
context.Background(),
&tx.BroadcastTxRequest{
Mode: tx.BroadcastMode_BROADCAST_MODE_SYNC,
TxBytes: txBytes,
},
)
s.Require().NoError(err)
s.Require().Equal(uint32(0), grpcRes.TxResponse.Code)
}

// Test and enforce that we upfront reject any connections to baseapp containing
// invalid initial x-cosmos-block-height that aren't positive and in the range [0, max(int64)]
// See issue https://github.com/cosmos/cosmos-sdk/issues/7662.
Expand Down
Loading

0 comments on commit b6c8d5e

Please sign in to comment.