Skip to content

Commit

Permalink
RSDK-4716 - allow for graceful handling of failed modules (#2853)
Browse files Browse the repository at this point in the history
Co-authored-by: Benjamin Rewis <32186188+benjirewis@users.noreply.github.com>
Co-authored-by: Cheuk <90270663+cheukt@users.noreply.github.com>
  • Loading branch information
3 people authored and 10zingpd committed Sep 12, 2023
1 parent d593e93 commit ffe5045
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 26 deletions.
36 changes: 26 additions & 10 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ var (

// NewManager returns a Manager.
func NewManager(parentAddr string, logger golog.Logger, options modmanageroptions.Options) modmaninterface.ModuleManager {
restartCtx, restartCtxCancel := context.WithCancel(context.Background())
return &Manager{
logger: logger,
modules: map[string]*module{},
parentAddr: parentAddr,
rMap: map[resource.Name]*module{},
untrustedEnv: options.UntrustedEnv,
removeOrphanedResources: options.RemoveOrphanedResources,
restartCtx: restartCtx,
restartCtxCancel: restartCtxCancel,
}
}

Expand All @@ -67,14 +70,14 @@ type module struct {
// pendingRemoval allows delaying module close until after resources within it are closed
pendingRemoval bool

// inRecovery stores whether or not an OnUnexpectedExit function is trying
// to recover a crash of this module; inRecoveryLock guards the execution of
// an OnUnexpectedExit function for this module.
// inStartup stores whether or not the manager of the OnUnexpectedExit function
// is trying to start up this module; inRecoveryLock guards the execution of an
// OnUnexpectedExit function for this module.
//
// NOTE(benjirewis): Using just an atomic boolean is not sufficient, as OUE
// functions for the same module cannot overlap and should not continue after
// another OUE has finished.
inRecovery atomic.Bool
inStartup atomic.Bool
inRecoveryLock sync.Mutex
}

Expand All @@ -92,12 +95,18 @@ type Manager struct {
rMap map[resource.Name]*module
untrustedEnv bool
removeOrphanedResources func(ctx context.Context, rNames []resource.Name)
restartCtx context.Context
restartCtxCancel context.CancelFunc
}

// Close terminates module connections and processes.
func (mgr *Manager) Close(ctx context.Context) error {
mgr.mu.Lock()
defer mgr.mu.Unlock()

if mgr.restartCtxCancel != nil {
mgr.restartCtxCancel()
}
var err error
for _, mod := range mgr.modules {
err = multierr.Combine(err, mgr.remove(mod, false))
Expand Down Expand Up @@ -130,14 +139,21 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, conn *grpc.Clie
resources: map[resource.Name]*addedResource{},
}

// add calls startProcess, which can also be called by the OUE handler in the attemptRestart
// call. Both of these involve owning a lock, so in unhappy cases of malformed modules
// this can lead to a deadlock. To prevent this, we set inStartup here to indicate to
// the OUE handler that it shouldn't act while add is still processing.
mod.inStartup.Store(true)
defer mod.inStartup.Store(false)

var success bool
defer func() {
if !success {
mod.cleanupAfterStartupFailure(mgr, false)
}
}()

if err := mod.startProcess(ctx, mgr.parentAddr,
if err := mod.startProcess(mgr.restartCtx, mgr.parentAddr,
mgr.newOnUnexpectedExitHandler(mod), mgr.logger); err != nil {
return errors.WithMessage(err, "error while starting module "+mod.name)
}
Expand Down Expand Up @@ -440,15 +456,15 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b
return func(exitCode int) bool {
mod.inRecoveryLock.Lock()
defer mod.inRecoveryLock.Unlock()
if mod.inRecovery.Load() {
if mod.inStartup.Load() {
return false
}

mod.inRecovery.Store(true)
defer mod.inRecovery.Store(false)
mod.inStartup.Store(true)
defer mod.inStartup.Store(false)

// Use oueTimeout for entire attempted module restart.
ctx, cancel := context.WithTimeout(context.Background(), oueTimeout)
ctx, cancel := context.WithTimeout(mgr.restartCtx, oueTimeout)
defer cancel()

// Log error immediately, as this is unexpected behavior.
Expand Down Expand Up @@ -521,7 +537,7 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.

// Attempt to restart module process 3 times.
for attempt := 1; attempt < 4; attempt++ {
if err := mod.startProcess(ctx, mgr.parentAddr,
if err := mod.startProcess(mgr.restartCtx, mgr.parentAddr,
mgr.newOnUnexpectedExitHandler(mod), mgr.logger); err != nil {
mgr.logger.Errorf("attempt %d: error while restarting crashed module %s: %v",
attempt, mod.name, err)
Expand Down
45 changes: 45 additions & 0 deletions module/modmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,48 @@ func TestDebugModule(t *testing.T) {
})
}
}

func TestGracefulShutdownWithMalformedModule(t *testing.T) {
// This test ensures that module manager's `Add` can be interrupted by a `Close`
// call correctly, and no OUE restart goroutines will continue beyond their `inStartup`
// check. With our current design, `local_robot.Reconfigure` blocks the main thread,
// so the manager will not be `Closed` while a module is being `Add`ed. Future work
// (RSDK-4854) may change that though.
logger, logs := golog.NewObservedTestLogger(t)
// Precompile module to avoid timeout issues when building takes too long.
modPath, err := rtestutils.BuildTempModule(t, "module/testmodule")
test.That(t, err, test.ShouldBeNil)

modCfg := config.Module{
Name: "test-module",
ExePath: modPath,
LogLevel: "info",
}

// This cannot use t.TempDir() as the path it gives on MacOS exceeds module.MaxSocketAddressLength.
parentAddr, err := os.MkdirTemp("", "viam-test-*")
test.That(t, err, test.ShouldBeNil)
defer os.RemoveAll(parentAddr)
parentAddr += "/parent.sock"

mgr := NewManager(parentAddr, logger, modmanageroptions.Options{UntrustedEnv: false})

channel := make(chan struct{})
go func() {
err = mgr.Add(context.Background(), modCfg)
channel <- struct{}{}
}()
// close the mgr so we can confirm that `Add` still finishes, despite manager being closed
err = mgr.Close(context.Background())
test.That(t, err, test.ShouldBeNil)

// Confirm that the call to `Add` has completed and `err` has been set to its return value
<-channel
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "error while starting module test-module")

// check that the OUE handler hasn't been called at this point
// (we closed the mgr before `Add` hits its normal timeout). At any rate, the OUE handler
// will always exit quickly without doing anything so long as `Add` is mid-call.
test.That(t, logs.FilterMessageSnippet("module has unexpectedly exited").Len(), test.ShouldEqual, 0)
}
21 changes: 5 additions & 16 deletions robot/impl/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,11 +586,11 @@ func (manager *resourceManager) completeConfig(
gNode.SetLastError(errors.Wrap(err, "resource build error"))
return
}
// if the ctxWithTimeout has an error then that means we've timed out. This means
// that resource generation is running async, and we don't currently have good
// validation around how this might affect the resource graph. So, we avoid updating
// the graph to be safe.
if ctxWithTimeout.Err() != nil {
// if the ctxWithTimeout fails with DeadlineExceeded, then that means that
// resource generation is running async, and we don't currently have good
// validation around how this might affect the resource graph. So, we avoid
// updating the graph to be safe.
if errors.Is(ctxWithTimeout.Err(), context.DeadlineExceeded) {
manager.logger.Errorw("error building resource", "resource", conf.ResourceName(), "model", conf.Model, "error", ctxWithTimeout.Err())
} else {
gNode.SwapResource(newRes, conf.Model)
Expand Down Expand Up @@ -918,17 +918,6 @@ func (manager *resourceManager) updateResources(
}

// modules are not added into the resource tree as they belong to the module manager
for _, mod := range conf.Added.Modules {
// this is done in config validation but partial start rules require us to check again
if err := mod.Validate(""); err != nil {
manager.logger.Errorw("module config validation error; skipping", "module", mod.Name, "error", err)
continue
}
if err := manager.moduleManager.Add(ctx, mod); err != nil {
manager.logger.Errorw("error adding module", "module", mod.Name, "error", err)
continue
}
}
for _, mod := range conf.Modified.Modules {
// this is done in config validation but partial start rules require us to check again
if err := mod.Validate(""); err != nil {
Expand Down

0 comments on commit ffe5045

Please sign in to comment.