diff --git a/runtime/v2/app.go b/runtime/v2/app.go index 06bff4a8d86c..0e28d8e3b5a5 100644 --- a/runtime/v2/app.go +++ b/runtime/v2/app.go @@ -44,9 +44,9 @@ type App[T transaction.Tx] struct { amino legacy.Amino moduleManager *MM[T] - // GRPCQueryDecoders maps gRPC method name to a function that decodes the request + // GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request // bytes into a gogoproto.Message, which then can be passed to appmanager. - GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) + GRPCMethodsToMessageMap map[string]func() gogoproto.Message } // Name returns the app name. @@ -118,6 +118,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] { return a.AppManager } -func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) { - return a.GRPCQueryDecoders +func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message { + return a.GRPCMethodsToMessageMap } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 7f6d7a6d06fb..244353674c5a 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -556,7 +556,7 @@ func (m *MM[T]) assertNoForgottenModules( func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error { c := &configurator{ - grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){}, + grpcQueryDecoders: map[string]func() gogoproto.Message{}, stfQueryRouter: app.queryRouterBuilder, stfMsgRouter: app.msgRouterBuilder, registry: registry, @@ -567,7 +567,10 @@ func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], re if err != nil { return fmt.Errorf("unable to register services: %w", err) } - app.GRPCQueryDecoders = c.grpcQueryDecoders + // merge maps + for path, decoder := range c.grpcQueryDecoders { + app.GRPCMethodsToMessageMap[path] = decoder + } return nil } @@ -576,7 +579,7 @@ var _ grpc.ServiceRegistrar = (*configurator)(nil) type configurator struct { // grpcQueryDecoders is required because module expose queries through gRPC // this provides a way to route to modules using gRPC. - grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error) + grpcQueryDecoders map[string]func() gogoproto.Message stfQueryRouter *stf.MsgRouterBuilder stfMsgRouter *stf.MsgRouterBuilder @@ -618,11 +621,11 @@ func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{ if typ == nil { return fmt.Errorf("unable to find message in gogotype registry: %w", err) } - decoderFunc := func(bytes []byte) (gogoproto.Message, error) { - msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message) - return msg, gogoproto.Unmarshal(bytes, msg) + decoderFunc := func() gogoproto.Message { + return reflect.New(typ.Elem()).Interface().(gogoproto.Message) } - c.grpcQueryDecoders[md.MethodName] = decoderFunc + methodName := fmt.Sprintf("/%s/%s", sd.ServiceName, md.MethodName) + c.grpcQueryDecoders[methodName] = decoderFunc } return nil } diff --git a/runtime/v2/module.go b/runtime/v2/module.go index 2d428fd238c6..fbcf391b1f68 100644 --- a/runtime/v2/module.go +++ b/runtime/v2/module.go @@ -130,11 +130,12 @@ func ProvideAppBuilder[T transaction.Tx]( msgRouterBuilder := stf.NewMsgRouterBuilder() app := &App[T]{ - storeKeys: nil, - interfaceRegistrar: interfaceRegistrar, - amino: amino, - msgRouterBuilder: msgRouterBuilder, - queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router + storeKeys: nil, + interfaceRegistrar: interfaceRegistrar, + amino: amino, + msgRouterBuilder: msgRouterBuilder, + queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router + GRPCMethodsToMessageMap: map[string]func() proto.Message{}, } appBuilder := &AppBuilder[T]{app: app} diff --git a/schema/appdata/mux_test.go b/schema/appdata/mux_test.go index e5fbad0b64e1..70787fada8a5 100644 --- a/schema/appdata/mux_test.go +++ b/schema/appdata/mux_test.go @@ -62,6 +62,7 @@ func TestListenerMux(t *testing.T) { } func callAllCallbacksOnces(t *testing.T, listener Listener) { + t.Helper() if err := listener.InitializeModuleData(ModuleInitializationData{}); err != nil { t.Error(err) } diff --git a/server/v2/api/grpc/gogoreflection/serverreflection.go b/server/v2/api/grpc/gogoreflection/serverreflection.go index 077c15c3321a..79f520545a87 100644 --- a/server/v2/api/grpc/gogoreflection/serverreflection.go +++ b/server/v2/api/grpc/gogoreflection/serverreflection.go @@ -42,33 +42,41 @@ import ( "errors" "fmt" "io" - "log" "reflect" "sort" + "strings" "sync" - //nolint: staticcheck // keep this import for backward compatibility - "github.com/golang/protobuf/proto" + gogoproto "github.com/cosmos/gogoproto/proto" dpb "github.com/golang/protobuf/protoc-gen-go/descriptor" "google.golang.org/grpc" "google.golang.org/grpc/codes" rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" "google.golang.org/grpc/status" + "google.golang.org/protobuf/reflect/protodesc" + "google.golang.org/protobuf/reflect/protoreflect" + + "cosmossdk.io/core/log" ) type serverReflectionServer struct { rpb.UnimplementedServerReflectionServer s *grpc.Server + methods []string + initSymbols sync.Once serviceNames []string symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files + log log.Logger } // Register registers the server reflection service on the given gRPC server. -func Register(s *grpc.Server) { +func Register(s *grpc.Server, methods []string, logger log.Logger) { rpb.RegisterServerReflectionServer(s, &serverReflectionServer{ - s: s, + s: s, + methods: methods, + log: logger, }) } @@ -82,21 +90,12 @@ type protoMessage interface { func (s *serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) { s.initSymbols.Do(func() { - serviceInfo := s.s.GetServiceInfo() - s.symbols = map[string]*dpb.FileDescriptorProto{} - s.serviceNames = make([]string, 0, len(serviceInfo)) + services, fds := s.getServices(s.methods) + s.serviceNames = services + processed := map[string]struct{}{} - for svc, info := range serviceInfo { - s.serviceNames = append(s.serviceNames, svc) - fdenc, ok := parseMetadata(info.Metadata) - if !ok { - continue - } - fd, err := decodeFileDesc(fdenc) - if err != nil { - continue - } + for _, fd := range fds { s.processFile(fd, processed) } sort.Strings(s.serviceNames) @@ -207,7 +206,7 @@ func decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) { } fd := new(dpb.FileDescriptorProto) - if err := proto.Unmarshal(raw, fd); err != nil { + if err := gogoproto.Unmarshal(raw, fd); err != nil { return nil, fmt.Errorf("bad descriptor: %w", err) } return fd, nil @@ -237,7 +236,7 @@ func typeForName(name string) (reflect.Type, error) { } func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) { - m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message) + m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(gogoproto.Message) if !ok { return nil, fmt.Errorf("failed to create message from type: %v", st) } @@ -252,7 +251,7 @@ func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescripto } func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) { - m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message) + m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(gogoproto.Message) if !ok { return nil, fmt.Errorf("failed to create message from type: %v", st) } @@ -272,7 +271,7 @@ func fileDescWithDependencies(fd *dpb.FileDescriptorProto, sentFileDescriptors m queue = queue[1:] if sent := sentFileDescriptors[currentfd.GetName()]; len(r) == 0 || !sent { sentFileDescriptors[currentfd.GetName()] = true - currentfdEncoded, err := proto.Marshal(currentfd) + currentfdEncoded, err := gogoproto.Marshal(currentfd) if err != nil { return nil, err } @@ -305,24 +304,6 @@ func (s *serverReflectionServer) fileDescEncodingByFilename(name string, sentFil return fileDescWithDependencies(fd, sentFileDescriptors) } -// parseMetadata finds the file descriptor bytes specified meta. -// For SupportPackageIsVersion4, m is the name of the proto file, we -// call proto.FileDescriptor to get the byte slice. -// For SupportPackageIsVersion3, m is a byte slice itself. -func parseMetadata(meta interface{}) ([]byte, bool) { - // Check if meta is the file name. - if fileNameForMeta, ok := meta.(string); ok { - return getFileDescriptor(fileNameForMeta), true - } - - // Check if meta is the byte slice. - if enc, ok := meta.([]byte); ok { - return enc, true - } - - return nil, false -} - // fileDescEncodingContainingSymbol finds the file descriptor containing the // given symbol, finds all of its previously unsent transitive dependencies, // does marshaling on them, and returns the marshaled result. The given symbol @@ -446,7 +427,6 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio ErrorMessage: err.Error(), }, } - log.Printf("OH NO: %s", err) } else { out.MessageResponse = &rpb.ServerReflectionResponse_AllExtensionNumbersResponse{ AllExtensionNumbersResponse: &rpb.ExtensionNumberResponse{ //nolint:staticcheck // SA1019: we want to keep using v1alpha @@ -476,3 +456,28 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio } } } + +// getServices gets the unique list of services given a list of methods. +func (s *serverReflectionServer) getServices(methods []string) (svcs []string, fds []*dpb.FileDescriptorProto) { + registry, err := gogoproto.MergedRegistry() + if err != nil { + s.log.Error("unable to load merged registry", "err", err) + return nil, nil + } + seenSvc := map[protoreflect.FullName]struct{}{} + for _, methodName := range methods { + methodName = strings.Join(strings.Split(methodName[1:], "/"), ".") + md, err := registry.FindDescriptorByName(protoreflect.FullName(methodName)) + if err != nil { + s.log.Error("unable to load method descriptor", "method", methodName, "err", err) + continue + } + svc := md.(protoreflect.MethodDescriptor).Parent() + if _, seen := seenSvc[svc.FullName()]; !seen { + svcs = append(svcs, string(svc.FullName())) + file := svc.ParentFile() + fds = append(fds, protodesc.ToFileDescriptorProto(file)) + } + } + return +} diff --git a/server/v2/api/grpc/server.go b/server/v2/api/grpc/server.go index c5e31f5dfc71..e59bd99a83c0 100644 --- a/server/v2/api/grpc/server.go +++ b/server/v2/api/grpc/server.go @@ -2,11 +2,20 @@ package grpc import ( "context" + "errors" "fmt" + "io" "net" + "strconv" + "github.com/cosmos/gogoproto/proto" + "github.com/spf13/pflag" "github.com/spf13/viper" + "golang.org/x/exp/maps" "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" "cosmossdk.io/core/transaction" "cosmossdk.io/log" @@ -14,7 +23,12 @@ import ( "cosmossdk.io/server/v2/api/grpc/gogoreflection" ) -type GRPCServer[T transaction.Tx] struct { +const ( + BlockHeightHeader = "x-cosmos-block-height" + FlagAddress = "address" +) + +type Server[T transaction.Tx] struct { logger log.Logger config *Config cfgOptions []CfgOption @@ -23,32 +37,34 @@ type GRPCServer[T transaction.Tx] struct { } // New creates a new grpc server. -func New[T transaction.Tx](cfgOptions ...CfgOption) *GRPCServer[T] { - return &GRPCServer[T]{ +func New[T transaction.Tx](cfgOptions ...CfgOption) *Server[T] { + return &Server[T]{ cfgOptions: cfgOptions, } } // Init returns a correctly configured and initialized gRPC server. // Note, the caller is responsible for starting the server. -func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error { +func (s *Server[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.Logger) error { cfg := s.Config().(*Config) if v != nil { if err := serverv2.UnmarshalSubConfig(v, s.Name(), &cfg); err != nil { return fmt.Errorf("failed to unmarshal config: %w", err) } } + methodsMap := appI.GetGPRCMethodsToMessageMap() grpcSrv := grpc.NewServer( grpc.ForceServerCodec(newProtoCodec(appI.InterfaceRegistry()).GRPCCodec()), grpc.MaxSendMsgSize(cfg.MaxSendMsgSize), grpc.MaxRecvMsgSize(cfg.MaxRecvMsgSize), + grpc.UnknownServiceHandler( + makeUnknownServiceHandler(methodsMap, appI.GetAppManager()), + ), ) - // appI.RegisterGRPCServer(grpcSrv) - // Reflection allows external clients to see what services and methods the gRPC server exposes. - gogoreflection.Register(grpcSrv) + gogoreflection.Register(grpcSrv, maps.Keys(methodsMap), logger.With("sub-module", "grpc-reflection")) s.grpcSrv = grpcSrv s.config = cfg @@ -57,11 +73,88 @@ func (s *GRPCServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger log.L return nil } -func (s *GRPCServer[T]) Name() string { +func (s *Server[T]) StartCmdFlags() *pflag.FlagSet { + flags := pflag.NewFlagSet("grpc", pflag.ExitOnError) + + // start flags are prefixed with the server name + // as the config in prefixed with the server name + // this allows viper to properly bind the flags + prefix := func(f string) string { + return fmt.Sprintf("%s.%s", s.Name(), f) + } + + flags.String(prefix(FlagAddress), "localhost:9090", "Listen address") + return flags +} + +func makeUnknownServiceHandler(messageMap map[string]func() proto.Message, querier interface { + Query(ctx context.Context, version uint64, msg proto.Message) (proto.Message, error) +}, +) grpc.StreamHandler { + return func(srv any, stream grpc.ServerStream) error { + method, ok := grpc.MethodFromServerStream(stream) + if !ok { + return status.Error(codes.InvalidArgument, "unable to get method") + } + makeMsg, exists := messageMap[method] + if !exists { + return status.Errorf(codes.Unimplemented, "gRPC method %s is not handled", method) + } + for { + req := makeMsg() + err := stream.RecvMsg(req) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + + // extract height header + ctx := stream.Context() + height, err := getHeightFromCtx(ctx) + if err != nil { + return status.Errorf(codes.InvalidArgument, "invalid get height from context: %v", err) + } + resp, err := querier.Query(ctx, height, req) + if err != nil { + return err + } + err = stream.SendMsg(resp) + if err != nil { + return err + } + } + } +} + +func getHeightFromCtx(ctx context.Context) (uint64, error) { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return 0, nil + } + values := md.Get(BlockHeightHeader) + if len(values) == 0 { + return 0, nil + } + if len(values) != 1 { + return 0, fmt.Errorf("gRPC height metadata must be of length 1, got: %d", len(values)) + } + + heightStr := values[0] + height, err := strconv.ParseUint(heightStr, 10, 64) + if err != nil { + return 0, fmt.Errorf("unable to parse height string from gRPC metadata %s: %w", heightStr, err) + } + + return height, nil +} + +func (s *Server[T]) Name() string { return "grpc" } -func (s *GRPCServer[T]) Config() any { +func (s *Server[T]) Config() any { if s.config == nil || s.config == (&Config{}) { cfg := DefaultConfig() // overwrite the default config with the provided options @@ -75,7 +168,7 @@ func (s *GRPCServer[T]) Config() any { return s.config } -func (s *GRPCServer[T]) Start(ctx context.Context) error { +func (s *Server[T]) Start(ctx context.Context) error { if !s.config.Enable { return nil } @@ -102,7 +195,7 @@ func (s *GRPCServer[T]) Start(ctx context.Context) error { return err } -func (s *GRPCServer[T]) Stop(ctx context.Context) error { +func (s *Server[T]) Stop(ctx context.Context) error { if !s.config.Enable { return nil } diff --git a/server/v2/cometbft/abci.go b/server/v2/cometbft/abci.go index 9a901e85b7a7..c055a80ff60e 100644 --- a/server/v2/cometbft/abci.go +++ b/server/v2/cometbft/abci.go @@ -41,7 +41,6 @@ type Consensus[T transaction.Tx] struct { streaming streaming.Manager snapshotManager *snapshots.Manager mempool mempool.Mempool[T] - grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error) // legacy support for gRPC cfg Config indexedEvents map[string]struct{} @@ -60,6 +59,8 @@ type Consensus[T transaction.Tx] struct { addrPeerFilter types.PeerFilter // filter peers by address and port idPeerFilter types.PeerFilter // filter peers by node ID + + grpcMethodsMap map[string]func() gogoproto.Message // maps gRPC method to message creator func } func NewConsensus[T transaction.Tx]( @@ -69,7 +70,7 @@ func NewConsensus[T transaction.Tx]( app *appmanager.AppManager[T], mp mempool.Mempool[T], indexedEvents map[string]struct{}, - grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error), + gRPCMethodsMap map[string]func() gogoproto.Message, store types.Store, cfg Config, txCodec transaction.Codec[T], @@ -78,7 +79,7 @@ func NewConsensus[T transaction.Tx]( appName: appName, version: getCometBFTServerVersion(), consensusAuthority: consensusAuthority, - grpcQueryDecoders: grpcQueryDecoders, + grpcMethodsMap: gRPCMethodsMap, app: app, cfg: cfg, store: store, @@ -173,9 +174,10 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc // It is called by cometbft to query application state. func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) { // check if it's a gRPC method - grpcQueryDecoder, isGRPC := c.grpcQueryDecoders[req.Path] + makeGRPCRequest, isGRPC := c.grpcMethodsMap[req.Path] if isGRPC { - protoRequest, err := grpcQueryDecoder(req.Data) + protoRequest := makeGRPCRequest() + err = gogoproto.Unmarshal(req.Data, protoRequest) // TODO: use codec if err != nil { return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err) } diff --git a/server/v2/cometbft/server.go b/server/v2/cometbft/server.go index 1bb50e5b6767..cbb95ad6fb77 100644 --- a/server/v2/cometbft/server.go +++ b/server/v2/cometbft/server.go @@ -80,7 +80,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger l appI.GetAppManager(), s.serverOptions.Mempool, indexEvents, - appI.GetGRPCQueryDecoders(), + appI.GetGPRCMethodsToMessageMap(), store, s.config, s.initTxCodec, diff --git a/server/v2/go.mod b/server/v2/go.mod index aaf1411f94c2..8f91d1e7301c 100644 --- a/server/v2/go.mod +++ b/server/v2/go.mod @@ -38,6 +38,7 @@ require ( github.com/spf13/pflag v1.0.5 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 + golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc golang.org/x/sync v0.7.0 google.golang.org/grpc v1.64.1 google.golang.org/protobuf v1.34.2 @@ -99,7 +100,6 @@ require ( github.com/tidwall/btree v1.7.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.25.0 // indirect - golang.org/x/exp v0.0.0-20240531132922-fd00a4e0eefc // indirect golang.org/x/net v0.27.0 // indirect golang.org/x/sys v0.22.0 // indirect golang.org/x/text v0.16.0 // indirect diff --git a/server/v2/server_test.go b/server/v2/server_test.go index 3faef417757b..e757e7ecd5ca 100644 --- a/server/v2/server_test.go +++ b/server/v2/server_test.go @@ -16,6 +16,7 @@ import ( "cosmossdk.io/log" serverv2 "cosmossdk.io/server/v2" grpc "cosmossdk.io/server/v2/api/grpc" + "cosmossdk.io/server/v2/appmanager" ) type mockInterfaceRegistry struct{} @@ -33,6 +34,14 @@ type mockApp[T transaction.Tx] struct { serverv2.AppI[T] } +func (*mockApp[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message { + return map[string]func() gogoproto.Message{} +} + +func (*mockApp[T]) GetAppManager() *appmanager.AppManager[T] { + return nil +} + func (*mockApp[T]) InterfaceRegistry() coreapp.InterfaceRegistry { return &mockInterfaceRegistry{} } diff --git a/server/v2/types.go b/server/v2/types.go index fc6caaaeb735..978b46b78810 100644 --- a/server/v2/types.go +++ b/server/v2/types.go @@ -17,6 +17,6 @@ type AppI[T transaction.Tx] interface { InterfaceRegistry() coreapp.InterfaceRegistry GetAppManager() *appmanager.AppManager[T] GetConsensusAuthority() string - GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) + GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message GetStore() any }