Skip to content

Commit

Permalink
RSDK-7903 Keep unhealthy remotes in ResourceNames (#4421)
Browse files Browse the repository at this point in the history
  • Loading branch information
maximpertsov authored Oct 7, 2024
1 parent 8c5a136 commit 216563f
Show file tree
Hide file tree
Showing 7 changed files with 279 additions and 43 deletions.
11 changes: 11 additions & 0 deletions resource/graph_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ type GraphNode struct {
// stored the on the revision field.
pendingRevision string
revision string

// unreachable is an informational field that indicates if a resource on a remote
// machine is disconnected.
unreachable bool
}

var (
Expand Down Expand Up @@ -345,6 +349,13 @@ func (w *GraphNode) UpdateRevision(revision string) {
}
}

func (w *GraphNode) markReachability(reachable bool) {
w.mu.Lock()
defer w.mu.Unlock()

w.unreachable = !reachable
}

// SetNewConfig is used to inform the node that it has been modified
// and requires a reconfiguration. If the node was previously marked for removal,
// this unmarks it.
Expand Down
40 changes: 39 additions & 1 deletion resource/resource_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,22 @@ func (g *Graph) Names() []Name {
return names
}

// ReachableNames returns the all resource graph names, excluding remote resources that are unreached.
func (g *Graph) ReachableNames() []Name {
g.mu.Lock()
defer g.mu.Unlock()
names := make([]Name, len(g.nodes))
i := 0
for k, node := range g.nodes {
if node.unreachable {
continue
}
names[i] = k
i++
}
return names
}

// FindNodesByShortNameAndAPI will look for resources matching both the API and the name.
func (g *Graph) FindNodesByShortNameAndAPI(name Name) []Name {
g.mu.Lock()
Expand Down Expand Up @@ -624,10 +640,17 @@ func (g *Graph) isNodeDependingOn(node, child Name) bool {
return g.transitiveClosureMatrix[child][node] != 0
}

// SubGraphFrom returns a Sub-Graph containing all linked dependencies starting with node Name.
// SubGraphFrom returns a Sub-Graph containing all linked dependencies starting with node [Name].
func (g *Graph) SubGraphFrom(node Name) (*Graph, error) {
g.mu.Lock()
defer g.mu.Unlock()

return g.subGraphFromWithMutex(node)
}

// subGraphFrom returns a Sub-Graph containing all linked dependencies starting with node [Name].
// This method is NOT threadsafe: A client must hold [Graph.mu] while calling this method.
func (g *Graph) subGraphFromWithMutex(node Name) (*Graph, error) {
if _, ok := g.nodes[node]; !ok {
return nil, errors.Errorf("cannot create sub-graph from non existing node %q ", node.Name)
}
Expand All @@ -641,6 +664,21 @@ func (g *Graph) SubGraphFrom(node Name) (*Graph, error) {
return subGraph, nil
}

// MarkReachability marks all nodes in the subgraph from the given [Name] node as either reachable [true] or unreachable [false].
func (g *Graph) MarkReachability(node Name, reachable bool) error {
g.mu.Lock()
defer g.mu.Unlock()

subGraph, err := g.subGraphFromWithMutex(node)
if err != nil {
return err
}
for _, node := range subGraph.nodes {
node.markReachability(reachable)
}
return nil
}

// Status returns a slice of all graph node statuses.
func (g *Graph) Status() []Status {
g.mu.Lock()
Expand Down
114 changes: 78 additions & 36 deletions robot/impl/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,31 @@ func (manager *resourceManager) updateRemoteResourceNames(
rr internalRemoteRobot,
recreateAllClients bool,
) bool {
manager.logger.CDebugw(ctx, "updating remote resource names", "remote", remoteName, "recreateAllClients", recreateAllClients)
logger := manager.logger.WithFields("remote", remoteName)
logger.CDebugw(ctx, "updating remote resource names", "recreateAllClients", recreateAllClients)
activeResourceNames := map[resource.Name]bool{}
newResources := rr.ResourceNames()

// The connection to the remote is broken. In this case, we mark each resource node
// on this remote as disconnected but do not report any other changes.
if newResources == nil {
err := manager.resources.MarkReachability(remoteName, false)
if err != nil {
logger.Error(
"unable to mark remote resources as unreachable",
"error", err,
)
}
return false
}

err := manager.resources.MarkReachability(remoteName, true)
if err != nil {
logger.Error(
"unable to mark remote resources as reachable",
"error", err,
)
}
oldResources := manager.remoteResourceNames(remoteName)
for _, res := range oldResources {
activeResourceNames[res] = false
Expand All @@ -194,15 +216,14 @@ func (manager *resourceManager) updateRemoteResourceNames(

for _, resName := range newResources {
remoteResName := resName
resLogger := logger.WithFields("resource", remoteResName)
res, err := rr.ResourceByName(remoteResName) // this returns a remote known OR foreign resource client
if err != nil {
if errors.Is(err, client.ErrMissingClientRegistration) {
manager.logger.CDebugw(ctx, "couldn't obtain remote resource interface",
"name", remoteResName,
resLogger.CDebugw(ctx, "couldn't obtain remote resource interface",
"reason", err)
} else {
manager.logger.CErrorw(ctx, "couldn't obtain remote resource interface",
"name", remoteResName,
resLogger.CErrorw(ctx, "couldn't obtain remote resource interface",
"reason", err)
}
continue
Expand All @@ -220,18 +241,16 @@ func (manager *resourceManager) updateRemoteResourceNames(
continue
}
// reconfiguration attempt, remote could have changed, so close all duplicate name remote resource clients and readd new ones later
manager.logger.CDebugw(ctx, "attempting to remove remote resource", "name", resName)
resLogger.CDebugw(ctx, "attempting to remove remote resource")
if err := manager.markChildrenForUpdate(resName); err != nil {
manager.logger.CErrorw(ctx,
resLogger.CErrorw(ctx,
"failed to mark children of remote resource for update",
"resource", resName,
"reason", err)
continue
}
if err := gNode.Close(ctx); err != nil {
manager.logger.CErrorw(ctx,
resLogger.CErrorw(ctx,
"failed to close remote resource node",
"resource", resName,
"reason", err)
}
}
Expand All @@ -242,49 +261,45 @@ func (manager *resourceManager) updateRemoteResourceNames(
} else {
gNode = resource.NewConfiguredGraphNode(resource.Config{}, res, unknownModel)
if err := manager.resources.AddNode(resName, gNode); err != nil {
manager.logger.CErrorw(ctx, "failed to add remote resource node", "name", resName, "error", err)
resLogger.CErrorw(ctx, "failed to add remote resource node", "error", err)
}
}

err = manager.resources.AddChild(resName, remoteName)
if err != nil {
manager.logger.CErrorw(ctx,
"error while trying add node as a dependency of remote",
"node", resName,
"remote", remoteName)
resLogger.CErrorw(ctx,
"error while trying add node as a dependency of remote")
} else {
anythingChanged = true
}
if anythingChanged {
manager.logger.CDebugw(ctx, "remote resource names update completed with changes to resource graph", "remote", remoteName)
} else {
manager.logger.CDebugw(ctx, "remote resource names update completed with no changes to resource graph", "remote", remoteName)
}
}

if anythingChanged {
logger.CDebugw(ctx, "remote resource names update completed with changes to resource graph")
} else {
logger.CDebugw(ctx, "remote resource names update completed with no changes to resource graph")
}

for resName, isActive := range activeResourceNames {
if isActive {
continue
}
manager.logger.CDebugw(ctx, "attempting to remove remote resource", "name", resName)
resLogger := logger.WithFields("resource", resName)
resLogger.CDebugw(ctx, "attempting to remove remote resource")
gNode, ok := manager.resources.Node(resName)
if !ok || gNode.IsUninitialized() {
manager.logger.CDebugw(ctx,
"remote resource already removed",
"resource", resName)
resLogger.CDebugw(ctx, "remote resource already removed")
continue
}
if err := manager.markChildrenForUpdate(resName); err != nil {
manager.logger.CErrorw(ctx,
resLogger.CErrorw(ctx,
"failed to mark children of remote resource for update",
"resource", resName,
"reason", err)
continue
}
if err := gNode.Close(ctx); err != nil {
manager.logger.CErrorw(ctx,
resLogger.CErrorw(ctx,
"failed to close remote resource node",
"resource", resName,
"reason", err)
}
anythingChanged = true
Expand Down Expand Up @@ -352,23 +367,50 @@ func (manager *resourceManager) internalResourceNames() []resource.Name {
return names
}

// ResourceNames returns the names of all resources in the manager.
// ResourceNames returns the names of all resources in the manager, excluding the following types of resources:
// - Resources that represent entire remote machines.
// - Resources that are considered internal to viam-server that cannot be removed via configuration.
func (manager *resourceManager) ResourceNames() []resource.Name {
names := []resource.Name{}
for _, k := range manager.resources.Names() {
if k.API == client.RemoteAPI ||
k.API.Type.Namespace == resource.APINamespaceRDKInternal {
continue
if manager.resourceName(k) {
names = append(names, k)
}
gNode, ok := manager.resources.Node(k)
if !ok || !gNode.HasResource() {
continue
}
return names
}

// reachableResourceNames returns the names of all resources in the manager, excluding the following types of resources:
// - Resources that represent entire remote machines.
// - Resources that are considered internal to viam-server that cannot be removed via configuration.
// - Remote resources that are currently unreachable.
func (manager *resourceManager) reachableResourceNames() []resource.Name {
names := []resource.Name{}
for _, k := range manager.resources.ReachableNames() {
if manager.resourceName(k) {
names = append(names, k)
}
names = append(names, k)
}
return names
}

// resourceName is a validation function that dictates if a given [resource.Name] should be returned by [ResourceNames].
// A resource should NOT be returned by [ResourceNames] if any of the following conditions are true:
// - The resource is not stored in the resource manager.
// - The resource represents an entire remote machine.
// - The resource is considered internal to viam-server, meaning it cannot be removed via configuration.
func (manager *resourceManager) resourceName(k resource.Name) bool {
if k.API == client.RemoteAPI ||
k.API.Type.Namespace == resource.APINamespaceRDKInternal {
return false
}
gNode, ok := manager.resources.Node(k)
if !ok || !gNode.HasResource() {
return false
}
return true
}

// ResourceRPCAPIs returns the types of all resource RPC APIs in use by the manager.
func (manager *resourceManager) ResourceRPCAPIs() []resource.RPCAPI {
resourceAPIs := resource.RegisteredAPIs()
Expand Down
Loading

0 comments on commit 216563f

Please sign in to comment.