Skip to content

Commit

Permalink
RSDK-3329, RSDK-4070 - Do not restart modules that crash before ready…
Browse files Browse the repository at this point in the history
… and respect resource configuration timeout (#2645)
  • Loading branch information
benjirewis authored and EshaMaharishi committed Jul 25, 2023
1 parent 4dabc8b commit 31753fb
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 37 deletions.
56 changes: 43 additions & 13 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, conn *grpc.Clie
name: conf.Name,
exe: conf.ExePath,
logLevel: conf.LogLevel,
conn: conn,
resources: map[resource.Name]*addedResource{},
}
mgr.modules[conf.Name] = mod
Expand All @@ -144,12 +145,12 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, conn *grpc.Clie
}
}()

// dial will re-use conn if it's non-nil (module being added in a Reconfigure).
if err := mod.dial(conn); err != nil {
// dial will re-use mod.conn if it's non-nil (module being added in a Reconfigure).
if err := mod.dial(); err != nil {
return errors.WithMessage(err, "error while dialing module "+mod.name)
}

if err := mod.checkReady(ctx, mgr.parentAddr); err != nil {
if err := mod.checkReady(ctx, mgr.parentAddr, mgr.logger); err != nil {
return errors.WithMessage(err, "error while waiting for module to be ready "+mod.name)
}

Expand Down Expand Up @@ -444,6 +445,36 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b
if mod.inRecovery.Load() {
return false
}

// If mod.handles was never set, the module was never actually ready in the
// first place before crashing. Log an error and do not attempt a restart;
// something is likely wrong with the module implemenation.
mgr.mu.Lock()
if mod.handles == nil {
mgr.logger.Errorw(
"module has unexpectedly exited without responding to a ready request ",
"module", mod.name,
"exit_code", exitCode,
)
// Remove module and close connection. Process will already be stopped.
for r, m := range mgr.rMap {
if m == mod {
delete(mgr.rMap, r)
}
}
delete(mgr.modules, mod.name)
if mod.conn != nil {
if err := mod.conn.Close(); err != nil {
mgr.logger.Errorw("error while closing connection from crashed module",
"module", mod.name,
"error", err)
}
}
mgr.mu.Unlock()
return false
}
mgr.mu.Unlock()

mod.inRecovery.Store(true)
defer mod.inRecovery.Store(false)

Expand Down Expand Up @@ -558,15 +589,15 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
}
}()

// dial will re-use connection; old connection can still be used when module
// dial will re-use mod.conn; old connection can still be used when module
// crashes.
if err := mod.dial(mod.conn); err != nil {
if err := mod.dial(); err != nil {
mgr.logger.Errorw("error while dialing restarted module",
"module", mod.name, "error", err)
return orphanedResourceNames
}

if err := mod.checkReady(ctx, mgr.parentAddr); err != nil {
if err := mod.checkReady(ctx, mgr.parentAddr, mgr.logger); err != nil {
mgr.logger.Errorw("error while waiting for restarted module to be ready",
"module", mod.name, "error", err)
return orphanedResourceNames
Expand All @@ -576,10 +607,9 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
return nil
}

// dial will use the passed-in connection to make a new module service client
// or Dial m.addr if the passed-in connection is nil.
func (m *module) dial(conn *grpc.ClientConn) error {
m.conn = conn
// dial will use m.conn to make a new module service client or Dial m.addr if
// m.conn is nil.
func (m *module) dial() error {
if m.conn == nil {
// TODO(PRODUCT-343): session support probably means interceptors here
var err error
Expand All @@ -603,8 +633,8 @@ func (m *module) dial(conn *grpc.ClientConn) error {
return nil
}

func (m *module) checkReady(ctx context.Context, parentAddr string) error {
ctxTimeout, cancelFunc := context.WithTimeout(ctx, time.Second*30)
func (m *module) checkReady(ctx context.Context, parentAddr string, logger golog.Logger) error {
ctxTimeout, cancelFunc := context.WithTimeout(ctx, rutils.GetResourceConfigurationTimeout(logger))
defer cancelFunc()

for {
Expand Down Expand Up @@ -655,7 +685,7 @@ func (m *module) startProcess(
return errors.WithMessage(err, "module startup failed")
}

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*30)
ctxTimeout, cancel := context.WithTimeout(ctx, rutils.GetResourceConfigurationTimeout(logger))
defer cancel()
for {
select {
Expand Down
60 changes: 57 additions & 3 deletions module/modmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
modmanageroptions "go.viam.com/rdk/module/modmanager/options"
"go.viam.com/rdk/resource"
rtestutils "go.viam.com/rdk/testutils"
rutils "go.viam.com/rdk/utils"
)

func TestModManagerFunctions(t *testing.T) {
Expand Down Expand Up @@ -55,16 +56,16 @@ func TestModManagerFunctions(t *testing.T) {
err = mod.startProcess(ctx, parentAddr, nil, logger)
test.That(t, err, test.ShouldBeNil)

err = mod.dial(nil)
err = mod.dial()
test.That(t, err, test.ShouldBeNil)

// check that dial can re-use connections.
oldConn := mod.conn
err = mod.dial(mod.conn)
err = mod.dial()
test.That(t, err, test.ShouldBeNil)
test.That(t, mod.conn, test.ShouldEqual, oldConn)

err = mod.checkReady(ctx, parentAddr)
err = mod.checkReady(ctx, parentAddr, logger)
test.That(t, err, test.ShouldBeNil)

mod.registerResources(mgr, logger)
Expand Down Expand Up @@ -459,6 +460,59 @@ func TestModuleReloading(t *testing.T) {
// Assert that RemoveOrphanedResources was called once.
test.That(t, dummyRemoveOrphanedResourcesCallCount.Load(), test.ShouldEqual, 1)
})
t.Run("immediate crash is not restarted", func(t *testing.T) {
logger, logs := golog.NewObservedTestLogger(t)

modCfg.ExePath = rutils.ResolveFile("module/testmodule/fakemodule.sh")

// Lower global timeout early to avoid race with actual restart code.
defer func(oriOrigVal time.Duration) {
oueRestartInterval = oriOrigVal
}(oueRestartInterval)
oueRestartInterval = 10 * time.Millisecond

// Lower resource configuration timeout to avoid waiting for 60 seconds
// for manager.Add to time out waiting for module to start listening.
defer func() {
test.That(t, os.Unsetenv(rutils.ResourceConfigurationTimeoutEnvVar),
test.ShouldBeNil)
}()
test.That(t, os.Setenv(rutils.ResourceConfigurationTimeoutEnvVar, "10ms"),
test.ShouldBeNil)

// This test neither uses a resource manager nor asserts anything about
// the existence of resources in the graph. Use a dummy
// RemoveOrphanedResources function so orphaned resource logic does not
// panic.
dummyRemoveOrphanedResources := func(context.Context, []resource.Name) {}
mgr := NewManager(parentAddr, logger, modmanageroptions.Options{
UntrustedEnv: false,
RemoveOrphanedResources: dummyRemoveOrphanedResources,
})
err = mgr.Add(ctx, modCfg)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring,
"timed out waiting for module test-module to start listening")

// Assert that manager removes module after immediate crash.
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, len(mgr.Configs()), test.ShouldEqual, 0)
})

err = mgr.Close(ctx)
test.That(t, err, test.ShouldBeNil)

// Assert that logs reflect that fakemodule exited without responding to a
// ready request, and the manager did not try to nor succeed in restarting
// it.
test.That(t, logs.FilterMessageSnippet(
"module has unexpectedly exited without responding to a ready request").Len(),
test.ShouldEqual, 1)
test.That(t, logs.FilterMessageSnippet("attempting to restart it").Len(),
test.ShouldEqual, 0)
test.That(t, logs.FilterMessageSnippet("module successfully restarted").Len(),
test.ShouldEqual, 0)
})
}

func TestDebugModule(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions module/testmodule/fakemodule.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

# fakemodule is a completely fake module that echos a message and exits. Used
# to test that modules that never respond to ready requests will not be
# restarted.

echo "this is a fake module; exiting now"
4 changes: 2 additions & 2 deletions resource/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func (e *mustRebuildError) Error() string {

// NewBuildTimeoutError is used when a resource times out during construction or reconfiguration.
func NewBuildTimeoutError(name Name) error {
envVar := "VIAM_RESOURCE_CONFIGURATION_TIMEOUT"
return fmt.Errorf(
"resource %s timed out during reconfigure. The default timeout is 1min; update %s env variable to override", name, envVar)
"resource %s timed out during reconfigure. The default timeout is %v; update %s env variable to override",
name, utils.DefaultResourceConfigurationTimeout, utils.ResourceConfigurationTimeoutEnvVar)
}

// DependencyNotFoundError is used when a resource is not found in a dependencies.
Expand Down
17 changes: 1 addition & 16 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package robotimpl

import (
"context"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -40,8 +39,6 @@ import (

var _ = robot.LocalRobot(&localRobot{})

var resourceConfigurationTimeout = time.Minute

// localRobot satisfies robot.LocalRobot and defers most
// logic to its manager.
type localRobot struct {
Expand Down Expand Up @@ -658,18 +655,6 @@ func (r *localRobot) newResource(
return resInfo.DeprecatedRobotConstructor(ctx, r, conf, resLogger)
}

func (r *localRobot) getTimeout() time.Duration {
if newTimeout := os.Getenv("VIAM_RESOURCE_CONFIGURATION_TIMEOUT"); newTimeout != "" {
timeOut, err := time.ParseDuration(newTimeout)
if err != nil {
r.logger.Warn("Failed to parse VIAM_RESOURCE_CONFIGURATION_TIMEOUT env var, falling back to default 1 minute timeout")
return resourceConfigurationTimeout
}
return timeOut
}
return resourceConfigurationTimeout
}

func (r *localRobot) updateWeakDependents(ctx context.Context) {
// track that we are current in resources up to the latest update time. This will
// be used to determine if this method should be called while completing a config.
Expand Down Expand Up @@ -698,7 +683,7 @@ func (r *localRobot) updateWeakDependents(ctx context.Context) {
}
}

timeout := r.getTimeout()
timeout := utils.GetResourceConfigurationTimeout(r.logger)
// NOTE(erd): this is intentionally hard coded since these services are treated specially with
// how they request dependencies or consume the robot's config. We should make an effort to
// formalize these as servcices that while internal, obey the reconfigure lifecycle.
Expand Down
70 changes: 70 additions & 0 deletions robot/impl/local_robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3104,3 +3104,73 @@ func TestResourcelessModuleRemove(t *testing.T) {
test.ShouldEqual, 1)
})
}

func TestCrashedModuleReconfigure(t *testing.T) {
ctx := context.Background()
logger, logs := golog.NewObservedTestLogger(t)

testPath, err := rtestutils.BuildTempModule(t, "module/testmodule")
test.That(t, err, test.ShouldBeNil)

// Lower resource configuration timeout to avoid waiting for 60 seconds
// for manager.Add to time out waiting for module to start listening.
defer func() {
test.That(t, os.Unsetenv(rutils.ResourceConfigurationTimeoutEnvVar),
test.ShouldBeNil)
}()
test.That(t, os.Setenv(rutils.ResourceConfigurationTimeoutEnvVar, "500ms"),
test.ShouldBeNil)

// Manually define model, as importing it can cause double registration.
helperModel := resource.NewModel("rdk", "test", "helper")

cfg := &config.Config{
Modules: []config.Module{
{
Name: "mod",
ExePath: testPath,
},
},
Components: []resource.Config{
{
Name: "h",
Model: helperModel,
API: generic.API,
},
},
}
r, err := robotimpl.New(ctx, cfg, logger)
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, r.Close(context.Background()), test.ShouldBeNil)
}()

_, err = r.ResourceByName(generic.Named("h"))
test.That(t, err, test.ShouldBeNil)

// Reconfigure module to a module that immediately crashes. Assert that "h"
// is removed and the manager did not attempt to restart the crashed module.
cfg.Modules[0].ExePath = rutils.ResolveFile("module/testmodule/fakemodule.sh")
r.Reconfigure(ctx, cfg)

testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, logs.FilterMessageSnippet(
"module has unexpectedly exited without responding to a ready request").Len(),
test.ShouldEqual, 1)
})

_, err = r.ResourceByName(generic.Named("h"))
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err, test.ShouldBeError,
resource.NewNotFoundError(generic.Named("h")))

// Reconfigure module back to testmodule. Assert that 'h' is eventually
// added back to the resource manager (the module recovers).
cfg.Modules[0].ExePath = testPath
r.Reconfigure(ctx, cfg)

testutils.WaitForAssertion(t, func(tb testing.TB) {
_, err = r.ResourceByName(generic.Named("h"))
test.That(tb, err, test.ShouldBeNil)
})
}
2 changes: 1 addition & 1 deletion robot/impl/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (manager *resourceManager) completeConfig(
}

resourceNames := manager.resources.ReverseTopologicalSort()
timeout := robot.getTimeout()
timeout := rutils.GetResourceConfigurationTimeout(manager.logger)
for _, resName := range resourceNames {
resChan := make(chan struct{}, 1)
resName := resName
Expand Down
9 changes: 7 additions & 2 deletions robot/impl/robot_reconfigure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
_ "go.viam.com/rdk/services/sensors/builtin"
rdktestutils "go.viam.com/rdk/testutils"
"go.viam.com/rdk/testutils/robottestutils"
rutils "go.viam.com/rdk/utils"
)

var (
Expand Down Expand Up @@ -3565,9 +3566,13 @@ func TestResourceConstructTimeout(t *testing.T) {

// create new cfg with wheeled base modified to trigger Reconfigure, set timeout
// to the shortest possible window to ensure timeout
defer func() {
test.That(t, os.Unsetenv(rutils.ResourceConfigurationTimeoutEnvVar),
test.ShouldBeNil)
}()
test.That(t, os.Setenv(rutils.ResourceConfigurationTimeoutEnvVar, "1ns"),
test.ShouldBeNil)

resourceConfigurationTimeout = time.Nanosecond
defer func() { resourceConfigurationTimeout = time.Minute }()
newestCfg := &config.Config{
Components: []resource.Config{
{
Expand Down
Loading

0 comments on commit 31753fb

Please sign in to comment.