Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-3329, RSDK-4070 - Do not restart modules that crash before ready and respect resource configuration timeout #2645

Merged
merged 7 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, conn *grpc.Clie
name: conf.Name,
exe: conf.ExePath,
logLevel: conf.LogLevel,
conn: conn,
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
resources: map[resource.Name]*addedResource{},
}
mgr.modules[conf.Name] = mod
Expand All @@ -144,12 +145,12 @@ func (mgr *Manager) add(ctx context.Context, conf config.Module, conn *grpc.Clie
}
}()

// dial will re-use conn if it's non-nil (module being added in a Reconfigure).
if err := mod.dial(conn); err != nil {
// dial will re-use mod.conn if it's non-nil (module being added in a Reconfigure).
if err := mod.dial(); err != nil {
return errors.WithMessage(err, "error while dialing module "+mod.name)
}

if err := mod.checkReady(ctx, mgr.parentAddr); err != nil {
if err := mod.checkReady(ctx, mgr.parentAddr, mgr.logger); err != nil {
return errors.WithMessage(err, "error while waiting for module to be ready "+mod.name)
}

Expand Down Expand Up @@ -444,6 +445,36 @@ func (mgr *Manager) newOnUnexpectedExitHandler(mod *module) func(exitCode int) b
if mod.inRecovery.Load() {
return false
}

// If mod.handles was never set, the module was never actually ready in the
// first place before crashing. Log an error and do not attempt a restart;
// something is likely wrong with the module implemenation.
mgr.mu.Lock()
if mod.handles == nil {
Otterverse marked this conversation as resolved.
Show resolved Hide resolved
mgr.logger.Errorw(
"module has unexpectedly exited without responding to a ready request ",
"module", mod.name,
"exit_code", exitCode,
)
// Remove module and close connection. Process will already be stopped.
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
for r, m := range mgr.rMap {
if m == mod {
delete(mgr.rMap, r)
}
}
delete(mgr.modules, mod.name)
if mod.conn != nil {
if err := mod.conn.Close(); err != nil {
mgr.logger.Errorw("error while closing connection from crashed module",
"module", mod.name,
"error", err)
}
}
mgr.mu.Unlock()
return false
}
mgr.mu.Unlock()

mod.inRecovery.Store(true)
defer mod.inRecovery.Store(false)

Expand Down Expand Up @@ -558,15 +589,15 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
}
}()

// dial will re-use connection; old connection can still be used when module
// dial will re-use mod.conn; old connection can still be used when module
// crashes.
if err := mod.dial(mod.conn); err != nil {
if err := mod.dial(); err != nil {
mgr.logger.Errorw("error while dialing restarted module",
"module", mod.name, "error", err)
return orphanedResourceNames
}

if err := mod.checkReady(ctx, mgr.parentAddr); err != nil {
if err := mod.checkReady(ctx, mgr.parentAddr, mgr.logger); err != nil {
mgr.logger.Errorw("error while waiting for restarted module to be ready",
"module", mod.name, "error", err)
return orphanedResourceNames
Expand All @@ -576,10 +607,9 @@ func (mgr *Manager) attemptRestart(ctx context.Context, mod *module) []resource.
return nil
}

// dial will use the passed-in connection to make a new module service client
// or Dial m.addr if the passed-in connection is nil.
func (m *module) dial(conn *grpc.ClientConn) error {
m.conn = conn
// dial will use m.conn to make a new module service client or Dial m.addr if
// m.conn is nil.
func (m *module) dial() error {
if m.conn == nil {
// TODO(PRODUCT-343): session support probably means interceptors here
var err error
Expand All @@ -603,8 +633,8 @@ func (m *module) dial(conn *grpc.ClientConn) error {
return nil
}

func (m *module) checkReady(ctx context.Context, parentAddr string) error {
ctxTimeout, cancelFunc := context.WithTimeout(ctx, time.Second*30)
func (m *module) checkReady(ctx context.Context, parentAddr string, logger golog.Logger) error {
ctxTimeout, cancelFunc := context.WithTimeout(ctx, rutils.GetResourceConfigurationTimeout(logger))
defer cancelFunc()

for {
Expand Down Expand Up @@ -655,7 +685,7 @@ func (m *module) startProcess(
return errors.WithMessage(err, "module startup failed")
}

ctxTimeout, cancel := context.WithTimeout(ctx, time.Second*30)
ctxTimeout, cancel := context.WithTimeout(ctx, rutils.GetResourceConfigurationTimeout(logger))
defer cancel()
for {
select {
Expand Down
60 changes: 57 additions & 3 deletions module/modmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
modmanageroptions "go.viam.com/rdk/module/modmanager/options"
"go.viam.com/rdk/resource"
rtestutils "go.viam.com/rdk/testutils"
rutils "go.viam.com/rdk/utils"
)

func TestModManagerFunctions(t *testing.T) {
Expand Down Expand Up @@ -55,16 +56,16 @@ func TestModManagerFunctions(t *testing.T) {
err = mod.startProcess(ctx, parentAddr, nil, logger)
test.That(t, err, test.ShouldBeNil)

err = mod.dial(nil)
err = mod.dial()
test.That(t, err, test.ShouldBeNil)

// check that dial can re-use connections.
oldConn := mod.conn
err = mod.dial(mod.conn)
err = mod.dial()
test.That(t, err, test.ShouldBeNil)
test.That(t, mod.conn, test.ShouldEqual, oldConn)

err = mod.checkReady(ctx, parentAddr)
err = mod.checkReady(ctx, parentAddr, logger)
test.That(t, err, test.ShouldBeNil)

mod.registerResources(mgr, logger)
Expand Down Expand Up @@ -459,6 +460,59 @@ func TestModuleReloading(t *testing.T) {
// Assert that RemoveOrphanedResources was called once.
test.That(t, dummyRemoveOrphanedResourcesCallCount.Load(), test.ShouldEqual, 1)
})
t.Run("immediate crash is not restarted", func(t *testing.T) {
benjirewis marked this conversation as resolved.
Show resolved Hide resolved
logger, logs := golog.NewObservedTestLogger(t)

modCfg.ExePath = rutils.ResolveFile("module/testmodule/fakemodule.sh")

// Lower global timeout early to avoid race with actual restart code.
defer func(oriOrigVal time.Duration) {
oueRestartInterval = oriOrigVal
}(oueRestartInterval)
oueRestartInterval = 10 * time.Millisecond

// Lower resource configuration timeout to avoid waiting for 60 seconds
// for manager.Add to time out waiting for module to start listening.
defer func() {
test.That(t, os.Unsetenv(rutils.ResourceConfigurationTimeoutEnvVar),
test.ShouldBeNil)
}()
test.That(t, os.Setenv(rutils.ResourceConfigurationTimeoutEnvVar, "10ms"),
test.ShouldBeNil)

// This test neither uses a resource manager nor asserts anything about
// the existence of resources in the graph. Use a dummy
// RemoveOrphanedResources function so orphaned resource logic does not
// panic.
dummyRemoveOrphanedResources := func(context.Context, []resource.Name) {}
mgr := NewManager(parentAddr, logger, modmanageroptions.Options{
UntrustedEnv: false,
RemoveOrphanedResources: dummyRemoveOrphanedResources,
})
err = mgr.Add(ctx, modCfg)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring,
"timed out waiting for module test-module to start listening")

// Assert that manager removes module after immediate crash.
testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, len(mgr.Configs()), test.ShouldEqual, 0)
})

err = mgr.Close(ctx)
test.That(t, err, test.ShouldBeNil)

// Assert that logs reflect that fakemodule exited without responding to a
// ready request, and the manager did not try to nor succeed in restarting
// it.
test.That(t, logs.FilterMessageSnippet(
"module has unexpectedly exited without responding to a ready request").Len(),
test.ShouldEqual, 1)
test.That(t, logs.FilterMessageSnippet("attempting to restart it").Len(),
test.ShouldEqual, 0)
test.That(t, logs.FilterMessageSnippet("module successfully restarted").Len(),
test.ShouldEqual, 0)
})
}

func TestDebugModule(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions module/testmodule/fakemodule.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

# fakemodule is a completely fake module that echos a message and exits. Used
# to test that modules that never respond to ready requests will not be
# restarted.

echo "this is a fake module; exiting now"
4 changes: 2 additions & 2 deletions resource/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ func (e *mustRebuildError) Error() string {

// NewBuildTimeoutError is used when a resource times out during construction or reconfiguration.
func NewBuildTimeoutError(name Name) error {
envVar := "VIAM_RESOURCE_CONFIGURATION_TIMEOUT"
return fmt.Errorf(
"resource %s timed out during reconfigure. The default timeout is 1min; update %s env variable to override", name, envVar)
"resource %s timed out during reconfigure. The default timeout is %v; update %s env variable to override",
name, utils.DefaultResourceConfigurationTimeout, utils.ResourceConfigurationTimeoutEnvVar)
}

// DependencyNotFoundError is used when a resource is not found in a dependencies.
Expand Down
17 changes: 1 addition & 16 deletions robot/impl/local_robot.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package robotimpl

import (
"context"
"os"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -40,8 +39,6 @@ import (

var _ = robot.LocalRobot(&localRobot{})

var resourceConfigurationTimeout = time.Minute

// localRobot satisfies robot.LocalRobot and defers most
// logic to its manager.
type localRobot struct {
Expand Down Expand Up @@ -653,18 +650,6 @@ func (r *localRobot) newResource(
return resInfo.DeprecatedRobotConstructor(ctx, r, conf, resLogger)
}

func (r *localRobot) getTimeout() time.Duration {
if newTimeout := os.Getenv("VIAM_RESOURCE_CONFIGURATION_TIMEOUT"); newTimeout != "" {
timeOut, err := time.ParseDuration(newTimeout)
if err != nil {
r.logger.Warn("Failed to parse VIAM_RESOURCE_CONFIGURATION_TIMEOUT env var, falling back to default 1 minute timeout")
return resourceConfigurationTimeout
}
return timeOut
}
return resourceConfigurationTimeout
}

func (r *localRobot) updateWeakDependents(ctx context.Context) {
// track that we are current in resources up to the latest update time. This will
// be used to determine if this method should be called while completing a config.
Expand Down Expand Up @@ -693,7 +678,7 @@ func (r *localRobot) updateWeakDependents(ctx context.Context) {
}
}

timeout := r.getTimeout()
timeout := utils.GetResourceConfigurationTimeout(r.logger)
// NOTE(erd): this is intentionally hard coded since these services are treated specially with
// how they request dependencies or consume the robot's config. We should make an effort to
// formalize these as servcices that while internal, obey the reconfigure lifecycle.
Expand Down
70 changes: 70 additions & 0 deletions robot/impl/local_robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3100,3 +3100,73 @@ func TestResourcelessModuleRemove(t *testing.T) {
test.ShouldEqual, 1)
})
}

func TestCrashedModuleReconfigure(t *testing.T) {
ctx := context.Background()
logger, logs := golog.NewObservedTestLogger(t)

testPath, err := rtestutils.BuildTempModule(t, "module/testmodule")
test.That(t, err, test.ShouldBeNil)

// Lower resource configuration timeout to avoid waiting for 60 seconds
// for manager.Add to time out waiting for module to start listening.
defer func() {
test.That(t, os.Unsetenv(rutils.ResourceConfigurationTimeoutEnvVar),
test.ShouldBeNil)
}()
test.That(t, os.Setenv(rutils.ResourceConfigurationTimeoutEnvVar, "500ms"),
test.ShouldBeNil)

// Manually define model, as importing it can cause double registration.
helperModel := resource.NewModel("rdk", "test", "helper")

cfg := &config.Config{
Modules: []config.Module{
{
Name: "mod",
ExePath: testPath,
},
},
Components: []resource.Config{
{
Name: "h",
Model: helperModel,
API: generic.API,
},
},
}
r, err := robotimpl.New(ctx, cfg, logger)
test.That(t, err, test.ShouldBeNil)
defer func() {
test.That(t, r.Close(context.Background()), test.ShouldBeNil)
}()

_, err = r.ResourceByName(generic.Named("h"))
test.That(t, err, test.ShouldBeNil)

// Reconfigure module to a module that immediately crashes. Assert that "h"
// is removed and the manager did not attempt to restart the crashed module.
cfg.Modules[0].ExePath = rutils.ResolveFile("module/testmodule/fakemodule.sh")
r.Reconfigure(ctx, cfg)

testutils.WaitForAssertion(t, func(tb testing.TB) {
test.That(tb, logs.FilterMessageSnippet(
"module has unexpectedly exited without responding to a ready request").Len(),
test.ShouldEqual, 1)
})

_, err = r.ResourceByName(generic.Named("h"))
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err, test.ShouldBeError,
resource.NewNotFoundError(generic.Named("h")))

// Reconfigure module back to testmodule. Assert that 'h' is eventually
// added back to the resource manager (the module recovers).
cfg.Modules[0].ExePath = testPath
r.Reconfigure(ctx, cfg)

testutils.WaitForAssertion(t, func(tb testing.TB) {
_, err = r.ResourceByName(generic.Named("h"))
test.That(tb, err, test.ShouldBeNil)
})
}
2 changes: 1 addition & 1 deletion robot/impl/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func (manager *resourceManager) completeConfig(
}

resourceNames := manager.resources.ReverseTopologicalSort()
timeout := robot.getTimeout()
timeout := rutils.GetResourceConfigurationTimeout(manager.logger)
for _, resName := range resourceNames {
resChan := make(chan struct{}, 1)
resName := resName
Expand Down
9 changes: 7 additions & 2 deletions robot/impl/robot_reconfigure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
_ "go.viam.com/rdk/services/sensors/builtin"
rdktestutils "go.viam.com/rdk/testutils"
"go.viam.com/rdk/testutils/robottestutils"
rutils "go.viam.com/rdk/utils"
)

var (
Expand Down Expand Up @@ -3565,9 +3566,13 @@ func TestResourceConstructTimeout(t *testing.T) {

// create new cfg with wheeled base modified to trigger Reconfigure, set timeout
// to the shortest possible window to ensure timeout
defer func() {
test.That(t, os.Unsetenv(rutils.ResourceConfigurationTimeoutEnvVar),
test.ShouldBeNil)
}()
test.That(t, os.Setenv(rutils.ResourceConfigurationTimeoutEnvVar, "1ns"),
test.ShouldBeNil)

resourceConfigurationTimeout = time.Nanosecond
defer func() { resourceConfigurationTimeout = time.Minute }()
newestCfg := &config.Config{
Components: []resource.Config{
{
Expand Down
Loading