Skip to content

Commit

Permalink
feat(serverv2): integrate gRPC (#21038)
Browse files Browse the repository at this point in the history
Co-authored-by: Marko <marko@baricevic.me>
Co-authored-by: marbar3778 <marbar3778@yahoo.com>
(cherry picked from commit 8b47141)

# Conflicts:
#	schema/appdata/mux_test.go
#	server/v2/cometbft/server.go
#	server/v2/go.mod
  • Loading branch information
testinginprod authored and mergify[bot] committed Jul 31, 2024
1 parent 3135f41 commit b4320ee
Show file tree
Hide file tree
Showing 11 changed files with 330 additions and 74 deletions.
8 changes: 4 additions & 4 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ type App[T transaction.Tx] struct {
amino legacy.Amino
moduleManager *MM[T]

// GRPCQueryDecoders maps gRPC method name to a function that decodes the request
// GRPCMethodsToMessageMap maps gRPC method name to a function that decodes the request
// bytes into a gogoproto.Message, which then can be passed to appmanager.
GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
GRPCMethodsToMessageMap map[string]func() gogoproto.Message
}

// Name returns the app name.
Expand Down Expand Up @@ -118,6 +118,6 @@ func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
return a.AppManager
}

func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) {
return a.GRPCQueryDecoders
func (a *App[T]) GetGPRCMethodsToMessageMap() map[string]func() gogoproto.Message {
return a.GRPCMethodsToMessageMap
}
17 changes: 10 additions & 7 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ func (m *MM[T]) assertNoForgottenModules(

func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error {
c := &configurator{
grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){},
grpcQueryDecoders: map[string]func() gogoproto.Message{},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
Expand All @@ -567,7 +567,10 @@ func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], re
if err != nil {
return fmt.Errorf("unable to register services: %w", err)
}
app.GRPCQueryDecoders = c.grpcQueryDecoders
// merge maps
for path, decoder := range c.grpcQueryDecoders {
app.GRPCMethodsToMessageMap[path] = decoder
}
return nil
}

Expand All @@ -576,7 +579,7 @@ var _ grpc.ServiceRegistrar = (*configurator)(nil)
type configurator struct {
// grpcQueryDecoders is required because module expose queries through gRPC
// this provides a way to route to modules using gRPC.
grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error)
grpcQueryDecoders map[string]func() gogoproto.Message

stfQueryRouter *stf.MsgRouterBuilder
stfMsgRouter *stf.MsgRouterBuilder
Expand Down Expand Up @@ -618,11 +621,11 @@ func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{
if typ == nil {
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
}
decoderFunc := func(bytes []byte) (gogoproto.Message, error) {
msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message)
return msg, gogoproto.Unmarshal(bytes, msg)
decoderFunc := func() gogoproto.Message {
return reflect.New(typ.Elem()).Interface().(gogoproto.Message)
}
c.grpcQueryDecoders[md.MethodName] = decoderFunc
methodName := fmt.Sprintf("/%s/%s", sd.ServiceName, md.MethodName)
c.grpcQueryDecoders[methodName] = decoderFunc
}
return nil
}
Expand Down
11 changes: 6 additions & 5 deletions runtime/v2/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,12 @@ func ProvideAppBuilder[T transaction.Tx](

msgRouterBuilder := stf.NewMsgRouterBuilder()
app := &App[T]{
storeKeys: nil,
interfaceRegistrar: interfaceRegistrar,
amino: amino,
msgRouterBuilder: msgRouterBuilder,
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
storeKeys: nil,
interfaceRegistrar: interfaceRegistrar,
amino: amino,
msgRouterBuilder: msgRouterBuilder,
queryRouterBuilder: stf.NewMsgRouterBuilder(), // TODO dedicated query router
GRPCMethodsToMessageMap: map[string]func() proto.Message{},
}
appBuilder := &AppBuilder[T]{app: app}

Expand Down
133 changes: 133 additions & 0 deletions schema/appdata/mux_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package appdata

import (
"fmt"
"testing"
)

func TestListenerMux(t *testing.T) {
t.Run("empty", func(t *testing.T) {
listener := ListenerMux(Listener{}, Listener{})

if listener.InitializeModuleData != nil {
t.Error("expected nil")
}
if listener.StartBlock != nil {
t.Error("expected nil")
}
if listener.OnTx != nil {
t.Error("expected nil")
}
if listener.OnEvent != nil {
t.Error("expected nil")
}
if listener.OnKVPair != nil {
t.Error("expected nil")
}
if listener.OnObjectUpdate != nil {
t.Error("expected nil")
}
if listener.Commit != nil {
t.Error("expected nil")
}
})

t.Run("all called once", func(t *testing.T) {
var calls []string
onCall := func(name string, i int, _ Packet) {
calls = append(calls, fmt.Sprintf("%s %d", name, i))
}

res := ListenerMux(callCollector(1, onCall), callCollector(2, onCall))

callAllCallbacksOnces(t, res)

checkExpectedCallOrder(t, calls, []string{
"InitializeModuleData 1",
"InitializeModuleData 2",
"StartBlock 1",
"StartBlock 2",
"OnTx 1",
"OnTx 2",
"OnEvent 1",
"OnEvent 2",
"OnKVPair 1",
"OnKVPair 2",
"OnObjectUpdate 1",
"OnObjectUpdate 2",
"Commit 1",
"Commit 2",
})
})
}

func callAllCallbacksOnces(t *testing.T, listener Listener) {
t.Helper()
if err := listener.InitializeModuleData(ModuleInitializationData{}); err != nil {
t.Error(err)
}
if err := listener.StartBlock(StartBlockData{}); err != nil {
t.Error(err)
}
if err := listener.OnTx(TxData{}); err != nil {
t.Error(err)
}
if err := listener.OnEvent(EventData{}); err != nil {
t.Error(err)
}
if err := listener.OnKVPair(KVPairData{}); err != nil {
t.Error(err)
}
if err := listener.OnObjectUpdate(ObjectUpdateData{}); err != nil {
t.Error(err)
}
if err := listener.Commit(CommitData{}); err != nil {
t.Error(err)
}
}

func callCollector(i int, onCall func(string, int, Packet)) Listener {
return Listener{
InitializeModuleData: func(ModuleInitializationData) error {
onCall("InitializeModuleData", i, nil)
return nil
},
StartBlock: func(StartBlockData) error {
onCall("StartBlock", i, nil)
return nil
},
OnTx: func(TxData) error {
onCall("OnTx", i, nil)
return nil
},
OnEvent: func(EventData) error {
onCall("OnEvent", i, nil)
return nil
},
OnKVPair: func(KVPairData) error {
onCall("OnKVPair", i, nil)
return nil
},
OnObjectUpdate: func(ObjectUpdateData) error {
onCall("OnObjectUpdate", i, nil)
return nil
},
Commit: func(CommitData) error {
onCall("Commit", i, nil)
return nil
},
}
}

func checkExpectedCallOrder(t *testing.T, actual, expected []string) {
t.Helper()
if len(actual) != len(expected) {
t.Fatalf("expected %d calls, got %d", len(expected), len(actual))
}

for i := range actual {
if actual[i] != expected[i] {
t.Errorf("expected %q, got %q", expected[i], actual[i])
}
}
}
87 changes: 46 additions & 41 deletions server/v2/api/grpc/gogoreflection/serverreflection.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,33 +42,41 @@ import (
"errors"
"fmt"
"io"
"log"
"reflect"
"sort"
"strings"
"sync"

//nolint: staticcheck // keep this import for backward compatibility
"github.com/golang/protobuf/proto"
gogoproto "github.com/cosmos/gogoproto/proto"
dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"

"cosmossdk.io/core/log"
)

type serverReflectionServer struct {
rpb.UnimplementedServerReflectionServer
s *grpc.Server

methods []string

initSymbols sync.Once
serviceNames []string
symbols map[string]*dpb.FileDescriptorProto // map of fully-qualified names to files
log log.Logger
}

// Register registers the server reflection service on the given gRPC server.
func Register(s *grpc.Server) {
func Register(s *grpc.Server, methods []string, logger log.Logger) {
rpb.RegisterServerReflectionServer(s, &serverReflectionServer{
s: s,
s: s,
methods: methods,
log: logger,
})
}

Expand All @@ -82,21 +90,12 @@ type protoMessage interface {

func (s *serverReflectionServer) getSymbols() (svcNames []string, symbolIndex map[string]*dpb.FileDescriptorProto) {
s.initSymbols.Do(func() {
serviceInfo := s.s.GetServiceInfo()

s.symbols = map[string]*dpb.FileDescriptorProto{}
s.serviceNames = make([]string, 0, len(serviceInfo))
services, fds := s.getServices(s.methods)
s.serviceNames = services

processed := map[string]struct{}{}
for svc, info := range serviceInfo {
s.serviceNames = append(s.serviceNames, svc)
fdenc, ok := parseMetadata(info.Metadata)
if !ok {
continue
}
fd, err := decodeFileDesc(fdenc)
if err != nil {
continue
}
for _, fd := range fds {
s.processFile(fd, processed)
}
sort.Strings(s.serviceNames)
Expand Down Expand Up @@ -207,7 +206,7 @@ func decodeFileDesc(enc []byte) (*dpb.FileDescriptorProto, error) {
}

fd := new(dpb.FileDescriptorProto)
if err := proto.Unmarshal(raw, fd); err != nil {
if err := gogoproto.Unmarshal(raw, fd); err != nil {
return nil, fmt.Errorf("bad descriptor: %w", err)
}
return fd, nil
Expand Down Expand Up @@ -237,7 +236,7 @@ func typeForName(name string) (reflect.Type, error) {
}

func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescriptorProto, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(gogoproto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
Expand All @@ -252,7 +251,7 @@ func fileDescContainingExtension(st reflect.Type, ext int32) (*dpb.FileDescripto
}

func (s *serverReflectionServer) allExtensionNumbersForType(st reflect.Type) ([]int32, error) {
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(proto.Message)
m, ok := reflect.Zero(reflect.PtrTo(st)).Interface().(gogoproto.Message)
if !ok {
return nil, fmt.Errorf("failed to create message from type: %v", st)
}
Expand All @@ -272,7 +271,7 @@ func fileDescWithDependencies(fd *dpb.FileDescriptorProto, sentFileDescriptors m
queue = queue[1:]
if sent := sentFileDescriptors[currentfd.GetName()]; len(r) == 0 || !sent {
sentFileDescriptors[currentfd.GetName()] = true
currentfdEncoded, err := proto.Marshal(currentfd)
currentfdEncoded, err := gogoproto.Marshal(currentfd)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -305,24 +304,6 @@ func (s *serverReflectionServer) fileDescEncodingByFilename(name string, sentFil
return fileDescWithDependencies(fd, sentFileDescriptors)
}

// parseMetadata finds the file descriptor bytes specified meta.
// For SupportPackageIsVersion4, m is the name of the proto file, we
// call proto.FileDescriptor to get the byte slice.
// For SupportPackageIsVersion3, m is a byte slice itself.
func parseMetadata(meta interface{}) ([]byte, bool) {
// Check if meta is the file name.
if fileNameForMeta, ok := meta.(string); ok {
return getFileDescriptor(fileNameForMeta), true
}

// Check if meta is the byte slice.
if enc, ok := meta.([]byte); ok {
return enc, true
}

return nil, false
}

// fileDescEncodingContainingSymbol finds the file descriptor containing the
// given symbol, finds all of its previously unsent transitive dependencies,
// does marshaling on them, and returns the marshaled result. The given symbol
Expand Down Expand Up @@ -446,7 +427,6 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio
ErrorMessage: err.Error(),
},
}
log.Printf("OH NO: %s", err)
} else {
out.MessageResponse = &rpb.ServerReflectionResponse_AllExtensionNumbersResponse{
AllExtensionNumbersResponse: &rpb.ExtensionNumberResponse{ //nolint:staticcheck // SA1019: we want to keep using v1alpha
Expand Down Expand Up @@ -476,3 +456,28 @@ func (s *serverReflectionServer) ServerReflectionInfo(stream rpb.ServerReflectio
}
}
}

// getServices gets the unique list of services given a list of methods.
func (s *serverReflectionServer) getServices(methods []string) (svcs []string, fds []*dpb.FileDescriptorProto) {
registry, err := gogoproto.MergedRegistry()
if err != nil {
s.log.Error("unable to load merged registry", "err", err)
return nil, nil
}
seenSvc := map[protoreflect.FullName]struct{}{}
for _, methodName := range methods {
methodName = strings.Join(strings.Split(methodName[1:], "/"), ".")
md, err := registry.FindDescriptorByName(protoreflect.FullName(methodName))
if err != nil {
s.log.Error("unable to load method descriptor", "method", methodName, "err", err)
continue
}
svc := md.(protoreflect.MethodDescriptor).Parent()
if _, seen := seenSvc[svc.FullName()]; !seen {
svcs = append(svcs, string(svc.FullName()))
file := svc.ParentFile()
fds = append(fds, protodesc.ToFileDescriptorProto(file))
}
}
return
}
Loading

0 comments on commit b4320ee

Please sign in to comment.