Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ RSDK-7903 Cache machine status in robot client #4399

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 47 additions & 26 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type RobotClient struct {
dialOptions []rpc.DialOption

mu sync.RWMutex
resourceNames []resource.Name
cachedMachineStatus robot.MachineStatus
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be *robot.MachineStatus to indicate a possible state where we haven't cached anything? Would mean you didn't have to return robot.MachineStatus{} in some of your error cases and could just return nil.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be *robot.MachineStatus to indicate a possible state where we haven't cached anything?

agreed - it would be helpful to at least make the internal cachedMachineStatus field a pointer to better express the case where nothing was cached yet.

however...

Would mean you didn't have to return robot.MachineStatus{} in some of your error cases and could just return nil.

the MachineStatus interface method specifies a struct return value instead of a struct pointer, so that method would still have to return MachineStatus{} values. we can revisit this interface typing decision if folks are strongly against it. I wanted to avoid having to do worry about excessive nil checking on MachineStatus, but depending on how large that structure grows it might be useful to make it a pointer for performance reasons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it a pointer for performance reasons

discussed offline with @dgottlieb - the resource.Status objects, whose fields are most likely to expand in the short-term, are in slice and therefore stored on the heap and don't create extra work when copying.

there's no immediate plans to add any fields to the top-level struct - perhaps we can revisit this interface type if/when that changes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah fair; thanks for the response here! Leaving as-is seems fine to me given your points.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, as-is for both the internal cachedMachineStatus field as well as the public method?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel too strongly. I guess I noticed everything was robot.MachineStatus and not *robot.MachineStatus, and felt it to be odd. Given your reasoning for keeping the public method as robot.MachineStatus, I think either value or pointer is fine for the cached value.

resourceRPCAPIs []resource.RPCAPI
resourceClients map[resource.Name]resource.Resource
remoteNameMap map[resource.Name]resource.Name
Expand Down Expand Up @@ -415,8 +415,8 @@ func (rc *RobotClient) connectWithLock(ctx context.Context) error {
func (rc *RobotClient) updateResourceClients(ctx context.Context) error {
activeResources := make(map[resource.Name]bool)

for _, name := range rc.resourceNames {
activeResources[name] = true
for _, rs := range rc.cachedMachineStatus.Resources {
activeResources[rs.Name] = true
}

for resourceName, client := range rc.resourceClients {
Expand Down Expand Up @@ -464,7 +464,7 @@ func (rc *RobotClient) checkConnection(ctx context.Context, checkEvery, reconnec
return err
}
} else {
if _, _, err := rc.resources(ctx); err != nil {
if _, _, err := rc.machineStatusAndRPCAPIs(ctx); err != nil {
return err
}
}
Expand Down Expand Up @@ -586,8 +586,8 @@ func (rc *RobotClient) ResourceByName(name resource.Name) (resource.Resource, er
}

// finally, before adding a new resource, make sure this name exists and is known
for _, knownName := range rc.resourceNames {
if name == knownName {
for _, rs := range rc.cachedMachineStatus.Resources {
if name == rs.Name {
resourceClient, err := rc.createClient(name)
if err != nil {
return nil, err
Expand All @@ -608,7 +608,7 @@ func (rc *RobotClient) createClient(name resource.Name) (resource.Resource, erro
return apiInfo.RPCClient(rc.backgroundCtx, &rc.conn, rc.remoteName, name, logger)
}

func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resource.RPCAPI, error) {
func (rc *RobotClient) machineStatusAndRPCAPIs(ctx context.Context) (robot.MachineStatus, []resource.RPCAPI, error) {
// RSDK-5356 If we are in a testing environment, never apply
// defaultResourcesTimeout. Tests run in parallel, and if execution of a test
// pauses for longer than 5s, below calls to ResourceNames or
Expand All @@ -620,22 +620,29 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour
defer cancel()
}

resp, err := rc.client.ResourceNames(ctx, &pb.ResourceNamesRequest{})
resp, err := rc.machineStatus(ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heads up @benjirewis @cheukt

spoke IRL with @dgottlieb - we can't move forward with this change, since it's backwards breaking. specifically, this will break comms between two machines if one is running a version has this this endpoint but the other is running a version that does not.

making this transition will require a different strategy to account for this risk, if we still decide to make it.

if err != nil {
return nil, nil, err
return robot.MachineStatus{}, nil, err
}

var resTypes []resource.RPCAPI

resources := make([]resource.Name, 0, len(resp.Resources))
for _, name := range resp.Resources {
newName := rprotoutils.ResourceNameFromProto(name)
resources = append(resources, newName)
mStatus := resp
resources := make([]resource.Status, 0, len(resp.Resources))
for _, rs := range resp.Resources {
if rs.Name.API == RemoteAPI {
continue
}
if rs.Name.API.Type.Namespace == resource.APINamespaceRDKInternal {
continue
}
resources = append(resources, rs)
}
mStatus.Resources = resources

// resource has previously returned an unimplemented response, skip rpc call
if rc.rpcSubtypesUnimplemented {
return resources, resTypes, nil
return mStatus, resTypes, nil
}

typesResp, err := rc.client.ResourceRPCSubtypes(ctx, &pb.ResourceRPCSubtypesRequest{})
Expand All @@ -656,7 +663,7 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour
}
svcDesc, ok := symDesc.(*desc.ServiceDescriptor)
if !ok {
return nil, nil, fmt.Errorf("expected descriptor to be service descriptor but got %T", symDesc)
return robot.MachineStatus{}, nil, fmt.Errorf("expected descriptor to be service descriptor but got %T", symDesc)
}
resTypes = append(resTypes, resource.RPCAPI{
API: rprotoutils.ResourceNameFromProto(resAPI.Subtype).API,
Expand All @@ -665,13 +672,13 @@ func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resour
}
} else {
if s, ok := status.FromError(err); !(ok && (s.Code() == codes.Unimplemented)) {
return nil, nil, err
return robot.MachineStatus{}, nil, err
}
// prevent future calls to ResourceRPCSubtypes
rc.rpcSubtypesUnimplemented = true
}

return resources, resTypes, nil
return mStatus, resTypes, nil
}

// Refresh manually updates the underlying parts of this machine.
Expand All @@ -686,13 +693,12 @@ func (rc *RobotClient) Refresh(ctx context.Context) (err error) {
func (rc *RobotClient) updateResources(ctx context.Context) error {
// call metadata service.

names, rpcAPIs, err := rc.resources(ctx)
mStatus, rpcAPIs, err := rc.machineStatusAndRPCAPIs(ctx)
if err != nil && status.Code(err) != codes.Unimplemented {
return fmt.Errorf("error updating resources: %w", err)
}

rc.resourceNames = make([]resource.Name, 0, len(names))
rc.resourceNames = append(rc.resourceNames, names...)
rc.cachedMachineStatus = mStatus
rc.resourceRPCAPIs = rpcAPIs

rc.updateRemoteNameMap()
Expand All @@ -703,17 +709,17 @@ func (rc *RobotClient) updateResources(ctx context.Context) error {
func (rc *RobotClient) updateRemoteNameMap() {
tempMap := make(map[resource.Name]resource.Name)
dupMap := make(map[resource.Name]bool)
for _, n := range rc.resourceNames {
if err := n.Validate(); err != nil {
for _, rs := range rc.cachedMachineStatus.Resources {
if err := rs.Name.Validate(); err != nil {
rc.Logger().Error(err)
continue
}
tempName := resource.RemoveRemoteName(n)
tempName := resource.RemoveRemoteName(rs.Name)
// If the short name already exists in the map then there is a collision and we make the long name empty.
if _, ok := tempMap[tempName]; ok {
dupMap[tempName] = true
} else {
tempMap[tempName] = n
tempMap[tempName] = rs.Name
}
}
for key := range dupMap {
Expand Down Expand Up @@ -759,8 +765,10 @@ func (rc *RobotClient) ResourceNames() []resource.Name {
}
rc.mu.RLock()
defer rc.mu.RUnlock()
names := make([]resource.Name, 0, len(rc.resourceNames))
names = append(names, rc.resourceNames...)
names := make([]resource.Name, 0, len(rc.cachedMachineStatus.Resources))
for _, rs := range rc.cachedMachineStatus.Resources {
names = append(names, rs.Name)
}
return names
}

Expand Down Expand Up @@ -1058,8 +1066,21 @@ func (rc *RobotClient) Shutdown(ctx context.Context) error {
return nil
}

// ErrDisconnected that a robot is disconnected.
var ErrDisconnected = errors.New("disconnected")

// MachineStatus returns the current status of the robot.
func (rc *RobotClient) MachineStatus(ctx context.Context) (robot.MachineStatus, error) {
if rc.checkConnected() != nil {
return robot.MachineStatus{}, ErrDisconnected
}

rc.mu.Lock()
defer rc.mu.Unlock()
return rc.cachedMachineStatus, nil
}

func (rc *RobotClient) machineStatus(ctx context.Context) (robot.MachineStatus, error) {
mStatus := robot.MachineStatus{}

req := &pb.GetMachineStatusRequest{}
Expand Down
1 change: 1 addition & 0 deletions robot/client/client_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ var exemptFromSession = map[string]bool{
"/proto.rpc.webrtc.v1.SignalingService/OptionalWebRTCConfig": true,
"/proto.rpc.v1.AuthService/Authenticate": true,
"/proto.rpc.v1.ExternalAuthService/AuthenticateTo": true,
"/viam.robot.v1.RobotService/GetMachineStatus": true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are now using MachineStatus for inter-robot communication alongside ResourceNames, we need to exempt it from session safety monitoring like we do for ResourceNames

"/viam.robot.v1.RobotService/ResourceNames": true,
"/viam.robot.v1.RobotService/ResourceRPCSubtypes": true,
"/viam.robot.v1.RobotService/StartSession": true,
Expand Down
18 changes: 15 additions & 3 deletions robot/client/client_session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.viam.com/rdk/robot/client"
"go.viam.com/rdk/robot/web"
"go.viam.com/rdk/session"
rdktestutils "go.viam.com/rdk/testutils"
"go.viam.com/rdk/testutils/inject"
"go.viam.com/rdk/testutils/robottestutils"
)
Expand Down Expand Up @@ -101,8 +102,12 @@ func TestClientSessionOptions(t *testing.T) {

sessMgr := &sessionManager{}
arbName := resource.NewName(echoAPI, "woo")
injectResources := []resource.Name{arbName}
injectRobot := &inject.Robot{
ResourceNamesFunc: func() []resource.Name { return []resource.Name{arbName} },
ResourceNamesFunc: func() []resource.Name { return injectResources },
MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) {
return rdktestutils.ResourcesToMachineStatus(injectResources), nil
Comment on lines +107 to +109
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bulk of test updates in this PR involve injecting MachineStatus function to test robot clients, since this API replaces ResourceNames as the means of sourcing resources

},
ResourceByNameFunc: func(name resource.Name) (resource.Resource, error) {
return &dummyEcho{Named: arbName.AsNamed()}, nil
},
Expand Down Expand Up @@ -284,8 +289,12 @@ func TestClientSessionExpiration(t *testing.T) {
arbName := resource.NewName(echoAPI, "woo")

var dummyEcho1 dummyEcho
injectResources := []resource.Name{arbName}
injectRobot := &inject.Robot{
ResourceNamesFunc: func() []resource.Name { return []resource.Name{arbName} },
ResourceNamesFunc: func() []resource.Name { return injectResources },
MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) {
return rdktestutils.ResourcesToMachineStatus(injectResources), nil
},
ResourceByNameFunc: func(name resource.Name) (resource.Resource, error) {
return &dummyEcho1, nil
},
Expand Down Expand Up @@ -478,7 +487,10 @@ func TestClientSessionResume(t *testing.T) {

sessMgr := &sessionManager{}
injectRobot := &inject.Robot{
ResourceNamesFunc: func() []resource.Name { return []resource.Name{} },
ResourceNamesFunc: func() []resource.Name { return []resource.Name{} },
MachineStatusFunc: func(ctx context.Context) (robot.MachineStatus, error) {
return robot.MachineStatus{}, nil
},
ResourceRPCAPIsFunc: func() []resource.RPCAPI { return nil },
LoggerFunc: func() logging.Logger { return logger },
SessMgr: sessMgr,
Expand Down
Loading
Loading