Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC reflection v1 #3338

Merged
merged 2 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions js/modules/k6/grpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,18 @@ func TestClient(t *testing.T) {
code: `client.connect("GRPCBIN_ADDR", {reflect: true})`,
},
},
{
name: "ReflectV1",
setup: func(tb *httpmultibin.HTTPMultiBin) {
reflection.RegisterV1(tb.ServerGRPC)
},
initString: codeBlock{
code: `var client = new grpc.Client();`,
},
vuString: codeBlock{
code: `client.connect("GRPCBIN_ADDR", {reflect: true})`,
},
},
{
name: "ReflectBadParam",
setup: func(tb *httpmultibin.HTTPMultiBin) {
Expand Down
105 changes: 0 additions & 105 deletions lib/netext/grpcext/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"io"
"sync/atomic"
"testing"

"github.com/jhump/protoreflect/desc/protoparse"
Expand All @@ -14,12 +13,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protodesc"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/dynamicpb"
)

Expand Down Expand Up @@ -157,107 +153,6 @@ func TestConnInvokeInvalid(t *testing.T) {
}
}

func TestResolveFileDescriptors(t *testing.T) {
t.Parallel()

tests := []struct {
name string
pkgs []string
services []string
expectedDescriptors int
}{
{
name: "SuccessSamePackage",
pkgs: []string{"mypkg"},
services: []string{"Service1", "Service2", "Service3"},
expectedDescriptors: 3,
},
{
name: "SuccessMultiPackages",
pkgs: []string{"mypkg1", "mypkg2", "mypkg3"},
services: []string{"Service", "Service", "Service"},
expectedDescriptors: 3,
},
{
name: "DeduplicateServices",
pkgs: []string{"mypkg1"},
services: []string{"Service1", "Service2", "Service1"},
expectedDescriptors: 2,
},
{
name: "NoServices",
services: []string{},
expectedDescriptors: 0,
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
var (
lsr = &reflectpb.ListServiceResponse{}
mock = &getServiceFileDescriptorMock{}
)
for i, service := range tt.services {
// if only one package is defined then
// the package is the same for every service
pkg := tt.pkgs[0]
if len(tt.pkgs) > 1 {
pkg = tt.pkgs[i]
}

lsr.Service = append(lsr.Service, &reflectpb.ServiceResponse{
Name: fmt.Sprintf("%s.%s", pkg, service),
})
mock.pkgs = append(mock.pkgs, pkg)
mock.names = append(mock.names, service)
}

rc := reflectionClient{}
fdset, err := rc.resolveServiceFileDescriptors(mock, lsr)
require.NoError(t, err)
assert.Len(t, fdset.File, tt.expectedDescriptors)
})
}
}

type getServiceFileDescriptorMock struct {
pkgs []string
names []string
nreqs int64
}

func (m *getServiceFileDescriptorMock) Send(req *reflectpb.ServerReflectionRequest) error {
// TODO: check that the sent message is expected,
// otherwise return an error
return nil
}

func (m *getServiceFileDescriptorMock) Recv() (*reflectpb.ServerReflectionResponse, error) {
n := atomic.AddInt64(&m.nreqs, 1)
ptr := func(s string) (sptr *string) {
return &s
}
index := n - 1
fdp := &descriptorpb.FileDescriptorProto{
Package: ptr(m.pkgs[index]),
Name: ptr(m.names[index]),
}
b, err := proto.Marshal(fdp)
if err != nil {
return nil, err
}
srr := &reflectpb.ServerReflectionResponse{
MessageResponse: &reflectpb.ServerReflectionResponse_FileDescriptorResponse{
FileDescriptorResponse: &reflectpb.FileDescriptorResponse{
FileDescriptorProto: [][]byte{b},
},
},
}
return srr, nil
}

func methodFromProto(method string) protoreflect.MethodDescriptor {
path := "any-path"
parser := protoparse.Parser{
Expand Down
92 changes: 22 additions & 70 deletions lib/netext/grpcext/reflect.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"context"
"fmt"

"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/grpcreflect"
"google.golang.org/grpc"
reflectpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
)

Expand All @@ -18,97 +18,49 @@ type reflectionClient struct {
// Reflect will use the grpc reflection api to make the file descriptors available to request.
// It is called in the connect function the first time the Client.Connect function is called.
func (rc *reflectionClient) Reflect(ctx context.Context) (*descriptorpb.FileDescriptorSet, error) {
client := reflectpb.NewServerReflectionClient(rc.Conn)
methodClient, err := client.ServerReflectionInfo(ctx)
if err != nil {
return nil, fmt.Errorf("can't get server info: %w", err)
}
req := &reflectpb.ServerReflectionRequest{
MessageRequest: &reflectpb.ServerReflectionRequest_ListServices{},
}
resp, err := sendReceive(methodClient, req)
client := grpcreflect.NewClientAuto(ctx, rc.Conn)

services, err := client.ListServices()
if err != nil {
return nil, fmt.Errorf("can't list services: %w", err)
}
listResp := resp.GetListServicesResponse()
if listResp == nil {
return nil, fmt.Errorf("can't list services, nil response")
}
fdset, err := rc.resolveServiceFileDescriptors(methodClient, listResp)
if err != nil {
return nil, fmt.Errorf("can't resolve services' file descriptors: %w", err)
}
return fdset, nil
}

func (rc *reflectionClient) resolveServiceFileDescriptors(
client sendReceiver,
res *reflectpb.ListServiceResponse,
) (*descriptorpb.FileDescriptorSet, error) {
services := res.GetService()
seen := make(map[fileDescriptorLookupKey]bool, len(services))
fdset := &descriptorpb.FileDescriptorSet{
File: make([]*descriptorpb.FileDescriptorProto, 0, len(services)),
}

for _, service := range services {
req := &reflectpb.ServerReflectionRequest{
MessageRequest: &reflectpb.ServerReflectionRequest_FileContainingSymbol{
FileContainingSymbol: service.GetName(),
},
}
resp, err := sendReceive(client, req)
for _, srv := range services {
srvDescriptor, err := client.ResolveService(srv)
if err != nil {
return nil, fmt.Errorf("can't get method on service %q: %w", service, err)
return nil, fmt.Errorf("can't get method on service %q: %w", srv, err)
}
fdResp := resp.GetFileDescriptorResponse()
for _, raw := range fdResp.GetFileDescriptorProto() {
var fdp descriptorpb.FileDescriptorProto
if err = proto.Unmarshal(raw, &fdp); err != nil {
return nil, fmt.Errorf("can't unmarshal proto on service %q: %w", service, err)
}

stack := []*desc.FileDescriptor{srvDescriptor.GetFile()}

for len(stack) > 0 {
fdp := stack[len(stack)-1]
stack = stack[:len(stack)-1]

fdkey := fileDescriptorLookupKey{
Package: *fdp.Package,
Name: *fdp.Name,
Package: fdp.GetPackage(),
Name: fdp.GetName(),
}

stack = append(stack, fdp.GetDependencies()...)

if seen[fdkey] {
// When a proto file contains declarations for multiple services
// then the same proto file is returned multiple times,
// this prevents adding the returned proto file as a duplicate.
continue
}
seen[fdkey] = true
fdset.File = append(fdset.File, &fdp)
fdset.File = append(fdset.File, fdp.AsFileDescriptorProto())
}
}
return fdset, nil
}

// sendReceiver is a smaller interface for decoupling
// from `reflectpb.ServerReflection_ServerReflectionInfoClient`,
// that has the dependency from `grpc.ClientStream`,
// which is too much in the case the requirement is to just make a reflection's request.
// It makes the API more restricted and with a controlled surface,
// in this way the testing should be easier also.
type sendReceiver interface {
Send(*reflectpb.ServerReflectionRequest) error
Recv() (*reflectpb.ServerReflectionResponse, error)
}

// sendReceive sends a request to a reflection client and,
// receives a response.
func sendReceive(
client sendReceiver,
req *reflectpb.ServerReflectionRequest,
) (*reflectpb.ServerReflectionResponse, error) {
if err := client.Send(req); err != nil {
return nil, fmt.Errorf("can't send request: %w", err)
}
resp, err := client.Recv()
if err != nil {
return nil, fmt.Errorf("can't receive response: %w", err)
}
return resp, nil
return fdset, nil
}

type fileDescriptorLookupKey struct {
Expand Down