From ba2a303dca973befbce6ee7075f8e4d79a4bec7d Mon Sep 17 00:00:00 2001 From: hexbabe Date: Tue, 1 Oct 2024 15:45:14 -0400 Subject: [PATCH] Add robot client to manager; Add robot server to module; Add test --- module/modmanager/manager.go | 20 +++++++++++ module/modmanager/manager_test.go | 32 +++++++++++++++++ module/module.go | 60 +++++++++++++++++++++++++++++++ 3 files changed, 112 insertions(+) diff --git a/module/modmanager/manager.go b/module/modmanager/manager.go index 87f9fe9826ef..9d582e3b7807 100644 --- a/module/modmanager/manager.go +++ b/module/modmanager/manager.go @@ -18,6 +18,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap/zapcore" pb "go.viam.com/api/module/v1" + robotpb "go.viam.com/api/robot/v1" "go.viam.com/utils" "go.viam.com/utils/pexec" "go.viam.com/utils/rpc" @@ -75,6 +76,9 @@ type module struct { handles modlib.HandlerMap sharedConn rdkgrpc.SharedConn client pb.ModuleServiceClient + // robotClient was added to supplement the ModuleServiceClient client to serve select robot level methods from the module server + // such as the DiscoverComponents API + robotClient robotpb.RobotServiceClient addr string resources map[resource.Name]*addedResource // resourcesMu must be held if the `resources` field is accessed without @@ -991,6 +995,7 @@ func (m *module) dial() error { // out. m.sharedConn.ResetConn(rpc.GrpcOverHTTPClientConn{ClientConn: conn}, m.logger) m.client = pb.NewModuleServiceClient(m.sharedConn.GrpcConn()) + m.robotClient = robotpb.NewRobotServiceClient(m.sharedConn.GrpcConn()) return nil } @@ -1172,6 +1177,21 @@ func (m *module) registerResources(mgr modmaninterface.ModuleManager, logger log ) (resource.Resource, error) { return mgr.AddResource(ctx, conf, DepsToNames(deps)) }, + Discover: func(ctx context.Context, logger logging.Logger) (interface{}, error) { + req := &robotpb.DiscoverComponentsRequest{ + Queries: []*robotpb.DiscoveryQuery{ + {Subtype: api.API.String(), Model: model.String()}, + }, + } + + res, err := m.robotClient.DiscoverComponents(ctx, req) + if err != nil { + m.logger.Errorf("error in modular DiscoverComponents: %w", err) + return nil, err + } + + return res, nil + }, }) } case api.API.IsService(): diff --git a/module/modmanager/manager_test.go b/module/modmanager/manager_test.go index 2d8953a2aaee..85897476ec66 100644 --- a/module/modmanager/manager_test.go +++ b/module/modmanager/manager_test.go @@ -23,6 +23,7 @@ import ( "go.viam.com/rdk/components/generic" "go.viam.com/rdk/components/motor" "go.viam.com/rdk/config" + "go.viam.com/rdk/examples/customresources/apis/gizmoapi" "go.viam.com/rdk/logging" modlib "go.viam.com/rdk/module" modmanageroptions "go.viam.com/rdk/module/modmanager/options" @@ -1322,3 +1323,34 @@ func TestBadModuleFailsFast(t *testing.T) { test.That(t, err.Error(), test.ShouldContainSubstring, "module test-module exited too quickly after attempted startup") } + +func TestModuleDiscoverRegistered(t *testing.T) { + ctx := context.Background() + logger := logging.NewTestLogger(t) + + // Precompile module to avoid timeout issues when building takes too long. + modPath := rtestutils.BuildTempModule(t, "examples/customresources/demos/complexmodule") + + modCfg := config.Module{ + Name: "complex-module", + ExePath: modPath, + } + + parentAddr := setupSocketWithRobot(t) + + mgr := setupModManager(t, ctx, parentAddr, logger, modmanageroptions.Options{UntrustedEnv: false}) + + err := mgr.Add(ctx, modCfg) + test.That(t, err, test.ShouldBeNil) + + // Retrieve the registration for the gizmo model. + api := gizmoapi.API + model := resource.NewModel("acme", "demo", "mygizmo") + + reg, ok := resource.LookupRegistration(api, model) + test.That(t, ok, test.ShouldBeTrue) + test.That(t, reg, test.ShouldNotBeNil) + + // Check that the Discover function is registered. + test.That(t, reg.Discover, test.ShouldNotBeNil) +} diff --git a/module/module.go b/module/module.go index 13eb58e1db76..d022f803768e 100644 --- a/module/module.go +++ b/module/module.go @@ -8,6 +8,7 @@ import ( "net" "os" "path/filepath" + "strings" "sync" "time" @@ -38,6 +39,7 @@ import ( "go.viam.com/rdk/resource" "go.viam.com/rdk/robot/client" rutils "go.viam.com/rdk/utils" + vprotoutils "go.viam.com/utils/protoutils" ) const ( @@ -191,6 +193,7 @@ type Module struct { pcFailed <-chan struct{} pb.UnimplementedModuleServiceServer streampb.UnimplementedStreamServiceServer + robotpb.UnimplementedRobotServiceServer } // NewModule returns the basic module framework/structure. @@ -230,6 +233,11 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod if err := m.server.RegisterServiceServer(ctx, &streampb.StreamService_ServiceDesc, m); err != nil { return nil, err } + // We register the RobotService API to supplement the ModuleService in order to serve select robot level methods from the module server + // such as the DiscoverComponents API + if err := m.server.RegisterServiceServer(ctx, &robotpb.RobotService_ServiceDesc, m); err != nil { + return nil, err + } // attempt to construct a PeerConnection pc, err := rgrpc.NewLocalPeerConnection(logger) @@ -506,6 +514,58 @@ func (m *Module) AddResource(ctx context.Context, req *pb.AddResourceRequest) (* return &pb.AddResourceResponse{}, nil } +func (m *Module) DiscoverComponents(ctx context.Context, req *robotpb.DiscoverComponentsRequest) (*robotpb.DiscoverComponentsResponse, error) { + var discoveries []*robotpb.Discovery + + for _, q := range req.Queries { + // Handle triplet edge case i.e. if the subtype doesn't contain ':', add the "rdk:component:" prefix + if !strings.ContainsRune(q.Subtype, ':') { + q.Subtype = "rdk:component:" + q.Subtype + } + + api, err := resource.NewAPIFromString(q.Subtype) + if err != nil { + return nil, fmt.Errorf("invalid subtype: %s: %w", q.Subtype, err) + } + model, err := resource.NewModelFromString(q.Model) + if err != nil { + return nil, fmt.Errorf("invalid model: %s: %w", q.Model, err) + } + + resInfo, ok := resource.LookupRegistration(api, model) + if !ok { + m.logger.Warnf("no registration found for API %s and model %s", api, model) + } + + if resInfo.Discover == nil { + m.logger.Warnf("discovery not supported for API %s and model %s", api, model) + } + + results, err := resInfo.Discover(ctx, m.logger) + if err != nil { + return nil, fmt.Errorf("error discovering components for API %s and model %s: %w", api, model, err) + } + if results == nil { + return nil, fmt.Errorf("error discovering components for API %s and model %s: results was nil", api, model) + } + + pbResults, err := vprotoutils.StructToStructPb(results) + if err != nil { + return nil, fmt.Errorf("unable to convert discovery results to pb struct for query %v: %w", q, err) + } + + pbDiscovery := &robotpb.Discovery{ + Query: q, + Results: pbResults, + } + discoveries = append(discoveries, pbDiscovery) + } + + return &robotpb.DiscoverComponentsResponse{ + Discovery: discoveries, + }, nil +} + // ReconfigureResource receives the component/service configuration from the parent. func (m *Module) ReconfigureResource(ctx context.Context, req *pb.ReconfigureResourceRequest) (*pb.ReconfigureResourceResponse, error) { var res resource.Resource