Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RSDK-8885] Allow DiscoverComponents API to play well with modules #4410

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"
pb "go.viam.com/api/module/v1"
robotpb "go.viam.com/api/robot/v1"
"go.viam.com/utils"
"go.viam.com/utils/pexec"
"go.viam.com/utils/rpc"
Expand Down Expand Up @@ -75,8 +76,11 @@ type module struct {
handles modlib.HandlerMap
sharedConn rdkgrpc.SharedConn
client pb.ModuleServiceClient
addr string
resources map[resource.Name]*addedResource
// robotClient supplements the ModuleServiceClient client to serve select robot level methods from the module server
// such as the DiscoverComponents API
robotClient robotpb.RobotServiceClient
addr string
resources map[resource.Name]*addedResource
// resourcesMu must be held if the `resources` field is accessed without
// write-locking the module manager.
resourcesMu sync.Mutex
Expand Down Expand Up @@ -991,6 +995,7 @@ func (m *module) dial() error {
// out.
m.sharedConn.ResetConn(rpc.GrpcOverHTTPClientConn{ClientConn: conn}, m.logger)
m.client = pb.NewModuleServiceClient(m.sharedConn.GrpcConn())
m.robotClient = robotpb.NewRobotServiceClient(m.sharedConn.GrpcConn())
return nil
}

Expand Down Expand Up @@ -1163,6 +1168,10 @@ func (m *module) registerResources(mgr modmaninterface.ModuleManager, logger log
case api.API.IsComponent():
for _, model := range models {
logger.Infow("Registering component API and model from module", "module", m.cfg.Name, "API", api.API, "model", model)
// We must copy because the Discover closure func relies on api and model, but they are iterators and mutate.
hexbabe marked this conversation as resolved.
Show resolved Hide resolved
// Copying prevents mutation.
modelCopy := model
apiCopy := api
resource.RegisterComponent(api.API, model, resource.Registration[resource.Resource, resource.NoNativeConfig]{
Constructor: func(
ctx context.Context,
Expand All @@ -1172,6 +1181,21 @@ func (m *module) registerResources(mgr modmaninterface.ModuleManager, logger log
) (resource.Resource, error) {
return mgr.AddResource(ctx, conf, DepsToNames(deps))
},
Discover: func(ctx context.Context, logger logging.Logger) (interface{}, error) {
req := &robotpb.DiscoverComponentsRequest{
Queries: []*robotpb.DiscoveryQuery{
{Subtype: apiCopy.API.String(), Model: modelCopy.String()},
},
}

res, err := m.robotClient.DiscoverComponents(ctx, req)
if err != nil {
m.logger.Errorf("error in modular DiscoverComponents: %s", err)
return nil, err
}

return res, nil
},
})
}
case api.API.IsService():
Expand Down
64 changes: 64 additions & 0 deletions module/modmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package modmanager

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -32,6 +33,20 @@ import (
rutils "go.viam.com/rdk/utils"
)

type testDiscoveryResult struct {
Discovery []testDiscoveryItem `json:"discovery"`
}

type testDiscoveryItem struct {
Query testDiscoveryQuery `json:"query"`
Results map[string]string `json:"results"`
}

type testDiscoveryQuery struct {
Subtype string `json:"subtype"`
Model string `json:"model"`
}

func setupSocketWithRobot(t *testing.T) string {
t.Helper()

Expand Down Expand Up @@ -1322,3 +1337,52 @@ func TestBadModuleFailsFast(t *testing.T) {

test.That(t, err.Error(), test.ShouldContainSubstring, "module test-module exited too quickly after attempted startup")
}

func TestModularDiscovery(t *testing.T) {
ctx := context.Background()
logger := logging.NewTestLogger(t)

modPath := rtestutils.BuildTempModule(t, "module/testmodule")

modCfg := config.Module{
Name: "test-module",
ExePath: modPath,
}

parentAddr := setupSocketWithRobot(t)

mgr := setupModManager(t, ctx, parentAddr, logger, modmanageroptions.Options{UntrustedEnv: false})

err := mgr.Add(ctx, modCfg)
test.That(t, err, test.ShouldBeNil)

// The helper model implements actual (foobar) discovery
reg, ok := resource.LookupRegistration(generic.API, resource.NewModel("rdk", "test", "helper"))
test.That(t, ok, test.ShouldBeTrue)
test.That(t, reg, test.ShouldNotBeNil)

// Check that the Discover function is registered and make call
test.That(t, reg.Discover, test.ShouldNotBeNil)
result, err := reg.Discover(ctx, logger)
test.That(t, err, test.ShouldBeNil)
t.Log("Discovery result: ", result)

// Format result
jsonData, err := json.Marshal(result)
test.That(t, err, test.ShouldBeNil)
// Debug: print the JSON data
t.Logf("Raw JSON: %s", string(jsonData))

var discoveryResult testDiscoveryResult
err = json.Unmarshal(jsonData, &discoveryResult)
test.That(t, err, test.ShouldBeNil)
// Debug: print the casted struct
t.Logf("Casted struct: %+v", discoveryResult)

// Test fields
test.That(t, len(discoveryResult.Discovery), test.ShouldEqual, 1)
discovery := discoveryResult.Discovery[0]
test.That(t, discovery.Query.Subtype, test.ShouldEqual, "rdk:component:generic")
test.That(t, discovery.Query.Model, test.ShouldEqual, "rdk:test:helper")
test.That(t, discovery.Results["foo"], test.ShouldEqual, "bar")
}
65 changes: 65 additions & 0 deletions module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"

Expand All @@ -24,6 +25,7 @@ import (
robotpb "go.viam.com/api/robot/v1"
streampb "go.viam.com/api/stream/v1"
"go.viam.com/utils"
vprotoutils "go.viam.com/utils/protoutils"
"go.viam.com/utils/rpc"
"golang.org/x/exp/maps"
"google.golang.org/grpc"
Expand Down Expand Up @@ -191,6 +193,7 @@ type Module struct {
pcFailed <-chan struct{}
pb.UnimplementedModuleServiceServer
streampb.UnimplementedStreamServiceServer
robotpb.UnimplementedRobotServiceServer
}

// NewModule returns the basic module framework/structure.
Expand Down Expand Up @@ -230,6 +233,11 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod
if err := m.server.RegisterServiceServer(ctx, &streampb.StreamService_ServiceDesc, m); err != nil {
return nil, err
}
// We register the RobotService API to supplement the ModuleService in order to serve select robot level methods from the module server
// such as the DiscoverComponents API
if err := m.server.RegisterServiceServer(ctx, &robotpb.RobotService_ServiceDesc, m); err != nil {
return nil, err
}

// attempt to construct a PeerConnection
pc, err := rgrpc.NewLocalPeerConnection(logger)
Expand Down Expand Up @@ -506,6 +514,63 @@ func (m *Module) AddResource(ctx context.Context, req *pb.AddResourceRequest) (*
return &pb.AddResourceResponse{}, nil
}

// DiscoverComponents takes a list of discovery queries and returns corresponding
// component configurations.
func (m *Module) DiscoverComponents(
ctx context.Context,
req *robotpb.DiscoverComponentsRequest,
) (*robotpb.DiscoverComponentsResponse, error) {
var discoveries []*robotpb.Discovery

for _, q := range req.Queries {
// Handle triplet edge case i.e. if the subtype doesn't contain ':', add the "rdk:component:" prefix
if !strings.ContainsRune(q.Subtype, ':') {
q.Subtype = "rdk:component:" + q.Subtype
}
Comment on lines +526 to +529
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think triplets are used/supported throughout now, so this guard might not be necessary here and in the corresponding version in robot/server/server.go.

anyway, not necessary to figure this out now since it doesn't block your change - we can follow up on this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha thanks for context

Added to maintain somewhat parallel logic to the server.go version

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added to maintain somewhat parallel logic to the server.go version

yup that's the right move - this should be in parity until we (netcode) address the logic in the robot server version of this function


api, err := resource.NewAPIFromString(q.Subtype)
if err != nil {
return nil, fmt.Errorf("invalid subtype: %s: %w", q.Subtype, err)
}
model, err := resource.NewModelFromString(q.Model)
if err != nil {
return nil, fmt.Errorf("invalid model: %s: %w", q.Model, err)
}

resInfo, ok := resource.LookupRegistration(api, model)
if !ok {
return nil, fmt.Errorf("no registration found for API %s and model %s", api, model)
}

if resInfo.Discover == nil {
return nil, fmt.Errorf("discovery not supported for API %s and model %s", api, model)
}

results, err := resInfo.Discover(ctx, m.logger)
if err != nil {
return nil, fmt.Errorf("error discovering components for API %s and model %s: %w", api, model, err)
}
if results == nil {
return nil, fmt.Errorf("error discovering components for API %s and model %s: results was nil", api, model)
}

pbResults, err := vprotoutils.StructToStructPb(results)
if err != nil {
return nil, fmt.Errorf("unable to convert discovery results to pb struct for query %v: %w", q, err)
}

pbDiscovery := &robotpb.Discovery{
Query: q,
Results: pbResults,
}
discoveries = append(discoveries, pbDiscovery)
}

return &robotpb.DiscoverComponentsResponse{
Discovery: discoveries,
}, nil
}

// ReconfigureResource receives the component/service configuration from the parent.
func (m *Module) ReconfigureResource(ctx context.Context, req *pb.ReconfigureResourceRequest) (*pb.ReconfigureResourceResponse, error) {
var res resource.Resource
Expand Down
9 changes: 8 additions & 1 deletion module/testmodule/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) err
resource.RegisterComponent(
generic.API,
helperModel,
resource.Registration[resource.Resource, resource.NoNativeConfig]{Constructor: newHelper})
resource.Registration[resource.Resource, resource.NoNativeConfig]{
Constructor: newHelper,
Discover: func(ctx context.Context, logger logging.Logger) (interface{}, error) {
return map[string]string{
"foo": "bar",
}, nil
},
})
err = myMod.AddModelFromRegistry(ctx, generic.API, helperModel)
if err != nil {
return err
Expand Down
Loading