Skip to content

Commit

Permalink
Add robot client to manager; Add robot server to module; Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
hexbabe committed Oct 1, 2024
1 parent d8a1647 commit ba2a303
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 0 deletions.
20 changes: 20 additions & 0 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"
pb "go.viam.com/api/module/v1"
robotpb "go.viam.com/api/robot/v1"
"go.viam.com/utils"
"go.viam.com/utils/pexec"
"go.viam.com/utils/rpc"
Expand Down Expand Up @@ -75,6 +76,9 @@ type module struct {
handles modlib.HandlerMap
sharedConn rdkgrpc.SharedConn
client pb.ModuleServiceClient
// robotClient was added to supplement the ModuleServiceClient client to serve select robot level methods from the module server
// such as the DiscoverComponents API
robotClient robotpb.RobotServiceClient
addr string
resources map[resource.Name]*addedResource
// resourcesMu must be held if the `resources` field is accessed without
Expand Down Expand Up @@ -991,6 +995,7 @@ func (m *module) dial() error {
// out.
m.sharedConn.ResetConn(rpc.GrpcOverHTTPClientConn{ClientConn: conn}, m.logger)
m.client = pb.NewModuleServiceClient(m.sharedConn.GrpcConn())
m.robotClient = robotpb.NewRobotServiceClient(m.sharedConn.GrpcConn())
return nil
}

Expand Down Expand Up @@ -1172,6 +1177,21 @@ func (m *module) registerResources(mgr modmaninterface.ModuleManager, logger log
) (resource.Resource, error) {
return mgr.AddResource(ctx, conf, DepsToNames(deps))
},
Discover: func(ctx context.Context, logger logging.Logger) (interface{}, error) {
req := &robotpb.DiscoverComponentsRequest{
Queries: []*robotpb.DiscoveryQuery{
{Subtype: api.API.String(), Model: model.String()},
},
}

res, err := m.robotClient.DiscoverComponents(ctx, req)
if err != nil {
m.logger.Errorf("error in modular DiscoverComponents: %w", err)
return nil, err
}

return res, nil
},
})
}
case api.API.IsService():
Expand Down
32 changes: 32 additions & 0 deletions module/modmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.viam.com/rdk/components/generic"
"go.viam.com/rdk/components/motor"
"go.viam.com/rdk/config"
"go.viam.com/rdk/examples/customresources/apis/gizmoapi"
"go.viam.com/rdk/logging"
modlib "go.viam.com/rdk/module"
modmanageroptions "go.viam.com/rdk/module/modmanager/options"
Expand Down Expand Up @@ -1322,3 +1323,34 @@ func TestBadModuleFailsFast(t *testing.T) {

test.That(t, err.Error(), test.ShouldContainSubstring, "module test-module exited too quickly after attempted startup")
}

func TestModuleDiscoverRegistered(t *testing.T) {
ctx := context.Background()
logger := logging.NewTestLogger(t)

// Precompile module to avoid timeout issues when building takes too long.
modPath := rtestutils.BuildTempModule(t, "examples/customresources/demos/complexmodule")

modCfg := config.Module{
Name: "complex-module",
ExePath: modPath,
}

parentAddr := setupSocketWithRobot(t)

mgr := setupModManager(t, ctx, parentAddr, logger, modmanageroptions.Options{UntrustedEnv: false})

err := mgr.Add(ctx, modCfg)
test.That(t, err, test.ShouldBeNil)

// Retrieve the registration for the gizmo model.
api := gizmoapi.API
model := resource.NewModel("acme", "demo", "mygizmo")

reg, ok := resource.LookupRegistration(api, model)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, reg, test.ShouldNotBeNil)

// Check that the Discover function is registered.
test.That(t, reg.Discover, test.ShouldNotBeNil)
}
60 changes: 60 additions & 0 deletions module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -38,6 +39,7 @@ import (
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot/client"
rutils "go.viam.com/rdk/utils"
vprotoutils "go.viam.com/utils/protoutils"
)

const (
Expand Down Expand Up @@ -191,6 +193,7 @@ type Module struct {
pcFailed <-chan struct{}
pb.UnimplementedModuleServiceServer
streampb.UnimplementedStreamServiceServer
robotpb.UnimplementedRobotServiceServer
}

// NewModule returns the basic module framework/structure.
Expand Down Expand Up @@ -230,6 +233,11 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod
if err := m.server.RegisterServiceServer(ctx, &streampb.StreamService_ServiceDesc, m); err != nil {
return nil, err
}
// We register the RobotService API to supplement the ModuleService in order to serve select robot level methods from the module server
// such as the DiscoverComponents API
if err := m.server.RegisterServiceServer(ctx, &robotpb.RobotService_ServiceDesc, m); err != nil {
return nil, err
}

// attempt to construct a PeerConnection
pc, err := rgrpc.NewLocalPeerConnection(logger)
Expand Down Expand Up @@ -506,6 +514,58 @@ func (m *Module) AddResource(ctx context.Context, req *pb.AddResourceRequest) (*
return &pb.AddResourceResponse{}, nil
}

func (m *Module) DiscoverComponents(ctx context.Context, req *robotpb.DiscoverComponentsRequest) (*robotpb.DiscoverComponentsResponse, error) {
var discoveries []*robotpb.Discovery

for _, q := range req.Queries {
// Handle triplet edge case i.e. if the subtype doesn't contain ':', add the "rdk:component:" prefix
if !strings.ContainsRune(q.Subtype, ':') {
q.Subtype = "rdk:component:" + q.Subtype
}

api, err := resource.NewAPIFromString(q.Subtype)
if err != nil {
return nil, fmt.Errorf("invalid subtype: %s: %w", q.Subtype, err)
}
model, err := resource.NewModelFromString(q.Model)
if err != nil {
return nil, fmt.Errorf("invalid model: %s: %w", q.Model, err)
}

resInfo, ok := resource.LookupRegistration(api, model)
if !ok {
m.logger.Warnf("no registration found for API %s and model %s", api, model)
}

if resInfo.Discover == nil {
m.logger.Warnf("discovery not supported for API %s and model %s", api, model)
}

results, err := resInfo.Discover(ctx, m.logger)
if err != nil {
return nil, fmt.Errorf("error discovering components for API %s and model %s: %w", api, model, err)
}
if results == nil {
return nil, fmt.Errorf("error discovering components for API %s and model %s: results was nil", api, model)
}

pbResults, err := vprotoutils.StructToStructPb(results)
if err != nil {
return nil, fmt.Errorf("unable to convert discovery results to pb struct for query %v: %w", q, err)
}

pbDiscovery := &robotpb.Discovery{
Query: q,
Results: pbResults,
}
discoveries = append(discoveries, pbDiscovery)
}

return &robotpb.DiscoverComponentsResponse{
Discovery: discoveries,
}, nil
}

// ReconfigureResource receives the component/service configuration from the parent.
func (m *Module) ReconfigureResource(ctx context.Context, req *pb.ReconfigureResourceRequest) (*pb.ReconfigureResourceResponse, error) {
var res resource.Resource
Expand Down

0 comments on commit ba2a303

Please sign in to comment.