diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 3aaf13dbdda..58026c650bf 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -18,7 +18,6 @@ import ( "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/allocrunner/tasklifecycle" "github.com/hashicorp/nomad/client/allocrunner/taskrunner" - "github.com/hashicorp/nomad/client/allocwatcher" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" @@ -152,10 +151,10 @@ type allocRunner struct { // prevAllocWatcher allows waiting for any previous or preempted allocations // to exit - prevAllocWatcher allocwatcher.PrevAllocWatcher + prevAllocWatcher config.PrevAllocWatcher // prevAllocMigrator allows the migration of a previous allocations alloc dir. - prevAllocMigrator allocwatcher.PrevAllocMigrator + prevAllocMigrator config.PrevAllocMigrator // dynamicRegistry contains all locally registered dynamic plugins (e.g csi // plugins). @@ -190,7 +189,7 @@ type allocRunner struct { // rpcClient is the RPC Client that should be used by the allocrunner and its // hooks to communicate with Nomad Servers. - rpcClient RPCer + rpcClient config.RPCer // serviceRegWrapper is the handler wrapper that is used by service hooks // to perform service and check registration and deregistration. @@ -203,13 +202,8 @@ type allocRunner struct { getter cinterfaces.ArtifactGetter } -// RPCer is the interface needed by hooks to make RPC calls. -type RPCer interface { - RPC(method string, args interface{}, reply interface{}) error -} - // NewAllocRunner returns a new allocation runner. -func NewAllocRunner(config *Config) (*allocRunner, error) { +func NewAllocRunner(config *config.AllocRunnerConfig) (interfaces.AllocRunner, error) { alloc := config.Alloc tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index beb2705e0ee..a477ae2432a 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allochealth" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" arstate "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/allocrunner/tasklifecycle" "github.com/hashicorp/nomad/client/allocrunner/taskrunner" @@ -34,7 +35,7 @@ import ( ) // destroy does a blocking destroy on an alloc runner -func destroy(ar *allocRunner) { +func destroy(ar interfaces.AllocRunner) { ar.Destroy() <-ar.DestroyCh() } @@ -50,7 +51,7 @@ func TestAllocRunner_AllocState_Initialized(t *testing.T) { defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) allocState := ar.AllocState() @@ -91,7 +92,7 @@ func TestAllocRunner_TaskLeader_KillTG(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar) go ar.Run() @@ -174,7 +175,7 @@ func TestAllocRunner_Lifecycle_Poststart(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar) go ar.Run() @@ -311,7 +312,7 @@ func TestAllocRunner_TaskMain_KillTG(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar) go ar.Run() @@ -433,7 +434,7 @@ func TestAllocRunner_Lifecycle_Poststop(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar) go ar.Run() @@ -518,13 +519,13 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { taskDefs []mock.LifecycleTaskDef isBatch bool hasLeader bool - action func(*allocRunner, *structs.Allocation) error + action func(interfaces.AllocRunner, *structs.Allocation) error expectedErr string expectedAfter map[string]structs.TaskState }{ { name: "restart entire allocation", - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartAll(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -538,7 +539,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { }, { name: "restart only running tasks", - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartRunning(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -561,7 +562,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, isBatch: true, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartAll(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -584,7 +585,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, isBatch: true, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartRunning(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -599,7 +600,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "restart entire allocation with leader", hasLeader: true, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartAll(ev) }, expectedAfter: map[string]structs.TaskState{ @@ -613,7 +614,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { }, { name: "stop from server", - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { stopAlloc := alloc.Copy() stopAlloc.DesiredStatus = structs.AllocDesiredStatusStop ar.Update(stopAlloc) @@ -630,7 +631,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { }, { name: "restart main task", - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartTask("main", ev) }, expectedAfter: map[string]structs.TaskState{ @@ -645,7 +646,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { { name: "restart leader main task", hasLeader: true, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartTask("main", ev) }, expectedAfter: map[string]structs.TaskState{ @@ -667,7 +668,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { time.Sleep(3 * time.Second) // make sure main task has exited return nil }, @@ -691,7 +692,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, hasLeader: true, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { time.Sleep(3 * time.Second) // make sure main task has exited return nil }, @@ -714,7 +715,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { time.Sleep(3 * time.Second) // make sure main task has exited return nil }, @@ -737,7 +738,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { time.Sleep(3 * time.Second) // make sure main task has exited return nil }, @@ -760,7 +761,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { {Name: "poststart-sidecar", RunFor: "100s", ExitCode: 0, Hook: "poststart", IsSidecar: true}, {Name: "poststop", RunFor: "1s", ExitCode: 0, Hook: "poststop", IsSidecar: false}, }, - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { // make sure main task has had a chance to restart once on its // own and fail again before we try to manually restart it time.Sleep(5 * time.Second) @@ -778,7 +779,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { }, { name: "restart prestart-sidecar task", - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartTask("prestart-sidecar", ev) }, expectedAfter: map[string]structs.TaskState{ @@ -792,7 +793,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { }, { name: "restart poststart-sidecar task", - action: func(ar *allocRunner, alloc *structs.Allocation) error { + action: func(ar interfaces.AllocRunner, alloc *structs.Allocation) error { return ar.RestartTask("poststart-sidecar", ev) }, expectedAfter: map[string]structs.TaskState{ @@ -834,7 +835,7 @@ func TestAllocRunner_Lifecycle_Restart(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar) go ar.Run() @@ -993,7 +994,7 @@ func TestAllocRunner_TaskGroup_ShutdownDelay(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar) go ar.Run() @@ -1118,7 +1119,7 @@ func TestAllocRunner_TaskLeader_StopTG(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar) go ar.Run() @@ -1216,15 +1217,15 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { conf.StateDB = state.NewMemDB(conf.Logger) ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) // Mimic Nomad exiting before the leader stopping is able to stop other tasks. - ar.tasks["leader"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) - ar.tasks["follower1"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) + ar.(*allocRunner).tasks["leader"].UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled)) + ar.(*allocRunner).tasks["follower1"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) // Create a new AllocRunner to test RestoreState and Run ar2, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar2) if err := ar2.Restore(); err != nil { @@ -1268,8 +1269,9 @@ func TestAllocRunner_Restore_LifecycleHooks(t *testing.T) { // Use a memory backed statedb conf.StateDB = state.NewMemDB(conf.Logger) - ar, err := NewAllocRunner(conf) - require.NoError(t, err) + arIface, err := NewAllocRunner(conf) + must.NoError(t, err) + ar := arIface.(*allocRunner) go ar.Run() defer destroy(ar) @@ -1292,9 +1294,10 @@ func TestAllocRunner_Restore_LifecycleHooks(t *testing.T) { ar.tasks["web"].UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) // Create a new AllocRunner to test Restore and Run. - ar2, err := NewAllocRunner(conf) - require.NoError(t, err) - require.NoError(t, ar2.Restore()) + arIface2, err := NewAllocRunner(conf) + must.NoError(t, err) + ar2 := arIface2.(*allocRunner) + must.NoError(t, ar2.Restore()) go ar2.Run() defer destroy(ar2) @@ -1327,8 +1330,9 @@ func TestAllocRunner_Update_Semantics(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() - ar, err := NewAllocRunner(conf) - require.NoError(err) + arIface, err := NewAllocRunner(conf) + must.NoError(t, err) + ar := arIface.(*allocRunner) upd1 := updatedAlloc(alloc) ar.Update(upd1) @@ -1388,7 +1392,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_Migration(t *testing.T) { defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) go ar.Run() defer destroy(ar) @@ -1442,7 +1446,7 @@ func TestAllocRunner_DeploymentHealth_Healthy_NoChecks(t *testing.T) { defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) start, done := time.Now(), time.Time{} go ar.Run() @@ -1536,7 +1540,7 @@ func TestAllocRunner_DeploymentHealth_Unhealthy_Checks(t *testing.T) { } ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) go ar.Run() defer destroy(ar) @@ -1585,7 +1589,7 @@ func TestAllocRunner_Destroy(t *testing.T) { conf.StateDB = state.NewMemDB(conf.Logger) ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) go ar.Run() // Wait for alloc to be running @@ -1625,8 +1629,8 @@ func TestAllocRunner_Destroy(t *testing.T) { require.Nil(t, ts) // Assert the alloc directory was cleaned - if _, err := os.Stat(ar.allocDir.AllocDir); err == nil { - require.Fail(t, "alloc dir still exists: %v", ar.allocDir.AllocDir) + if _, err := os.Stat(ar.(*allocRunner).allocDir.AllocDir); err == nil { + require.Fail(t, "alloc dir still exists: %v", ar.(*allocRunner).allocDir.AllocDir) } else if !os.IsNotExist(err) { require.Failf(t, "expected NotExist error", "found %v", err) } @@ -1640,7 +1644,7 @@ func TestAllocRunner_SimpleRun(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) go ar.Run() defer destroy(ar) @@ -1675,7 +1679,8 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) + ar.Run() defer destroy(ar) @@ -1683,9 +1688,9 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { // Step 2. Modify its directory task := alloc.Job.TaskGroups[0].Tasks[0] - dataFile := filepath.Join(ar.allocDir.SharedDir, "data", "data_file") + dataFile := filepath.Join(ar.GetAllocDir().SharedDir, "data", "data_file") os.WriteFile(dataFile, []byte("hello world"), os.ModePerm) - taskDir := ar.allocDir.TaskDirs[task.Name] + taskDir := ar.GetAllocDir().TaskDirs[task.Name] taskLocalFile := filepath.Join(taskDir.LocalDir, "local_file") os.WriteFile(taskLocalFile, []byte("good bye world"), os.ModePerm) @@ -1702,7 +1707,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { }) defer cleanup() ar2, err := NewAllocRunner(conf2) - require.NoError(t, err) + must.NoError(t, err) ar2.Run() defer destroy(ar2) @@ -1710,11 +1715,11 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { WaitForClientState(t, ar, structs.AllocClientStatusComplete) // Ensure that data from ar was moved to ar2 - dataFile = filepath.Join(ar2.allocDir.SharedDir, "data", "data_file") + dataFile = filepath.Join(ar2.GetAllocDir().SharedDir, "data", "data_file") fileInfo, _ := os.Stat(dataFile) require.NotNilf(t, fileInfo, "file %q not found", dataFile) - taskDir = ar2.allocDir.TaskDirs[task.Name] + taskDir = ar2.GetAllocDir().TaskDirs[task.Name] taskLocalFile = filepath.Join(taskDir.LocalDir, "local_file") fileInfo, _ = os.Stat(taskLocalFile) require.NotNilf(t, fileInfo, "file %q not found", dataFile) @@ -1757,7 +1762,8 @@ func TestAllocRunner_HandlesArtifactFailure(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) + go ar.Run() defer destroy(ar) @@ -1861,7 +1867,8 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) { } ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) + defer destroy(ar) go ar.Run() upd := conf.StateUpdater.(*MockStateUpdater) @@ -1931,7 +1938,8 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) + defer destroy(ar) go ar.Run() upd := conf.StateUpdater.(*MockStateUpdater) @@ -1951,7 +1959,7 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { // Update the alloc to be terminal which should cause the alloc runner to // stop the tasks and wait for a destroy. - update := ar.alloc.Copy() + update := ar.Alloc().Copy() update.DesiredStatus = structs.AllocDesiredStatusStop ar.Update(update) @@ -1967,8 +1975,8 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { } // Check the alloc directory still exists - if _, err := os.Stat(ar.allocDir.AllocDir); err != nil { - return false, fmt.Errorf("alloc dir destroyed: %v", ar.allocDir.AllocDir) + if _, err := os.Stat(ar.GetAllocDir().AllocDir); err != nil { + return false, fmt.Errorf("alloc dir destroyed: %v", ar.GetAllocDir().AllocDir) } return true, nil @@ -1991,8 +1999,8 @@ func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { } // Check the alloc directory was cleaned - if _, err := os.Stat(ar.allocDir.AllocDir); err == nil { - return false, fmt.Errorf("alloc dir still exists: %v", ar.allocDir.AllocDir) + if _, err := os.Stat(ar.GetAllocDir().AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.GetAllocDir().AllocDir) } else if !os.IsNotExist(err) { return false, fmt.Errorf("stat err: %v", err) } @@ -2015,7 +2023,8 @@ func TestAllocRunner_PersistState_Destroyed(t *testing.T) { defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) + defer destroy(ar) go ar.Run() @@ -2111,12 +2120,12 @@ func TestAllocRunner_Reconnect(t *testing.T) { defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) defer destroy(ar) go ar.Run() - for _, taskRunner := range ar.tasks { + for _, taskRunner := range ar.(*allocRunner).tasks { taskRunner.UpdateState(tc.taskState, tc.taskEvent) } @@ -2190,7 +2199,8 @@ func TestAllocRunner_Lifecycle_Shutdown_Order(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) + defer destroy(ar) go ar.Run() @@ -2391,8 +2401,9 @@ func TestHasSidecarTasks(t *testing.T) { arConf, cleanup := testAllocRunnerConfig(t, alloc) defer cleanup() - ar, err := NewAllocRunner(arConf) - require.NoError(t, err) + arIface, err := NewAllocRunner(arConf) + must.NoError(t, err) + ar := arIface.(*allocRunner) require.Equal(t, tc.hasSidecars, hasSidecarTasks(ar.tasks), "sidecars") @@ -2428,8 +2439,9 @@ func TestAllocRunner_PreKill_RunOnDone(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc.Copy()) t.Cleanup(cleanup) - ar, err := NewAllocRunner(conf) + arIface, err := NewAllocRunner(conf) must.NoError(t, err) + ar := arIface.(*allocRunner) // set our custom prekill hook hook := new(allocPreKillHook) @@ -2458,8 +2470,9 @@ func TestAllocRunner_LastAcknowledgedStateIsCurrent(t *testing.T) { conf, cleanup := testAllocRunnerConfig(t, alloc.Copy()) t.Cleanup(cleanup) - ar, err := NewAllocRunner(conf) + arIface, err := NewAllocRunner(conf) must.NoError(t, err) + ar := arIface.(*allocRunner) ar.SetNetworkStatus(&structs.AllocNetworkStatus{ InterfaceName: "eth0", diff --git a/client/allocrunner/alloc_runner_unix_test.go b/client/allocrunner/alloc_runner_unix_test.go index b223590682a..2ee94d3da72 100644 --- a/client/allocrunner/alloc_runner_unix_test.go +++ b/client/allocrunner/alloc_runner_unix_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/shoenig/test/must" "github.com/stretchr/testify/require" ) @@ -58,7 +59,7 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) { // Start and wait for task to be running ar, err := NewAllocRunner(conf) - require.NoError(t, err) + must.NoError(t, err) go ar.Run() defer destroy(ar) @@ -108,8 +109,9 @@ func TestAllocRunner_Restore_RunningTerminal(t *testing.T) { conf2.StateDB = conf.StateDB // Restore, start, and wait for task to be killed - ar2, err := NewAllocRunner(conf2) - require.NoError(t, err) + ar2Iface, err := NewAllocRunner(conf2) + must.NoError(t, err) + ar2 := ar2Iface.(*allocRunner) require.NoError(t, ar2.Restore()) @@ -168,8 +170,9 @@ func TestAllocRunner_Restore_CompletedBatch(t *testing.T) { conf.StateDB = state.NewMemDB(conf.Logger) // Start and wait for task to be running - ar, err := NewAllocRunner(conf) - require.NoError(t, err) + arIface, err := NewAllocRunner(conf) + must.NoError(t, err) + ar := arIface.(*allocRunner) go ar.Run() defer destroy(ar) @@ -201,10 +204,10 @@ func TestAllocRunner_Restore_CompletedBatch(t *testing.T) { conf2.StateDB = conf.StateDB // Restore, start, and wait for task to be killed - ar2, err := NewAllocRunner(conf2) - require.NoError(t, err) - - require.NoError(t, ar2.Restore()) + ar2Iface, err := NewAllocRunner(conf2) + must.NoError(t, err) + ar2 := ar2Iface.(*allocRunner) + must.NoError(t, ar2.Restore()) go ar2.Run() defer destroy(ar2) @@ -252,9 +255,9 @@ func TestAllocRunner_PreStartFailuresLeadToFailed(t *testing.T) { conf.StateDB = state.NewMemDB(conf.Logger) // Start and wait for task to be running - ar, err := NewAllocRunner(conf) - require.NoError(t, err) - + arIface, err := NewAllocRunner(conf) + must.NoError(t, err) + ar := arIface.(*allocRunner) ar.runnerHooks = append(ar.runnerHooks, &allocFailingPrestartHook{}) go ar.Run() diff --git a/client/allocrunner/config.go b/client/allocrunner/config.go index 00738f1d0e0..ce7f4b0b9ca 100644 --- a/client/allocrunner/config.go +++ b/client/allocrunner/config.go @@ -2,98 +2,3 @@ // SPDX-License-Identifier: MPL-2.0 package allocrunner - -import ( - log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/allocwatcher" - clientconfig "github.com/hashicorp/nomad/client/config" - "github.com/hashicorp/nomad/client/consul" - "github.com/hashicorp/nomad/client/devicemanager" - "github.com/hashicorp/nomad/client/dynamicplugins" - "github.com/hashicorp/nomad/client/interfaces" - "github.com/hashicorp/nomad/client/lib/cgutil" - "github.com/hashicorp/nomad/client/pluginmanager/csimanager" - "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" - "github.com/hashicorp/nomad/client/serviceregistration" - "github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore" - "github.com/hashicorp/nomad/client/serviceregistration/wrapper" - cstate "github.com/hashicorp/nomad/client/state" - "github.com/hashicorp/nomad/client/vaultclient" - "github.com/hashicorp/nomad/nomad/structs" -) - -// Config holds the configuration for creating an allocation runner. -type Config struct { - // Logger is the logger for the allocation runner. - Logger log.Logger - - // ClientConfig is the clients configuration. - ClientConfig *clientconfig.Config - - // Alloc captures the allocation that should be run. - Alloc *structs.Allocation - - // StateDB is used to store and restore state. - StateDB cstate.StateDB - - // Consul is the Consul client used to register task services and checks - Consul serviceregistration.Handler - - // ConsulProxies is the Consul client used to lookup supported envoy versions - // of the Consul agent. - ConsulProxies consul.SupportedProxiesAPI - - // ConsulSI is the Consul client used to manage service identity tokens. - ConsulSI consul.ServiceIdentityAPI - - // Vault is the Vault client to use to retrieve Vault tokens - Vault vaultclient.VaultClient - - // StateUpdater is used to emit updated task state - StateUpdater interfaces.AllocStateHandler - - // DeviceStatsReporter is used to lookup resource usage for alloc devices - DeviceStatsReporter interfaces.DeviceStatsReporter - - // PrevAllocWatcher handles waiting on previous or preempted allocations - PrevAllocWatcher allocwatcher.PrevAllocWatcher - - // PrevAllocMigrator allows the migration of a previous allocations alloc dir - PrevAllocMigrator allocwatcher.PrevAllocMigrator - - // DynamicRegistry contains all locally registered dynamic plugins (e.g csi - // plugins). - DynamicRegistry dynamicplugins.Registry - - // CSIManager is used to wait for CSI Volumes to be attached, and by the task - // runner to manage their mounting - CSIManager csimanager.Manager - - // DeviceManager is used to mount devices as well as lookup device - // statistics - DeviceManager devicemanager.Manager - - // DriverManager handles dispensing of driver plugins - DriverManager drivermanager.Manager - - // CpusetManager configures the cpuset cgroup if supported by the platform - CpusetManager cgutil.CpusetManager - - // ServersContactedCh is closed when the first GetClientAllocs call to - // servers succeeds and allocs are synced. - ServersContactedCh chan struct{} - - // RPCClient is the RPC Client that should be used by the allocrunner and its - // hooks to communicate with Nomad Servers. - RPCClient RPCer - - // ServiceRegWrapper is the handler wrapper that is used by service hooks - // to perform service and check registration and deregistration. - ServiceRegWrapper *wrapper.HandlerWrapper - - // CheckStore contains check result information. - CheckStore checkstore.Shim - - // Getter is an interface for retrieving artifacts. - Getter interfaces.ArtifactGetter -} diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index f18d23a1666..30e63ccc272 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -12,6 +12,7 @@ import ( hclog "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" cstructs "github.com/hashicorp/nomad/client/structs" @@ -30,7 +31,7 @@ type csiHook struct { csimanager csimanager.Manager // interfaces implemented by the allocRunner - rpcClient RPCer + rpcClient config.RPCer taskCapabilityGetter taskCapabilityGetter hookResources *cstructs.AllocHookResources @@ -49,7 +50,7 @@ type taskCapabilityGetter interface { GetTaskDriverCapabilities(string) (*drivers.Capabilities, error) } -func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient RPCer, taskCapabilityGetter taskCapabilityGetter, hookResources *cstructs.AllocHookResources, nodeSecret string) *csiHook { +func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient config.RPCer, taskCapabilityGetter taskCapabilityGetter, hookResources *cstructs.AllocHookResources, nodeSecret string) *csiHook { shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background()) diff --git a/client/allocrunner/interfaces/runner.go b/client/allocrunner/interfaces/runner.go index 575008d76d8..d7889426643 100644 --- a/client/allocrunner/interfaces/runner.go +++ b/client/allocrunner/interfaces/runner.go @@ -4,24 +4,51 @@ package interfaces import ( + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/state" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/drivers" ) -// AllocRunner is the interface for an allocation runner. +// AllocRunner is the interface to the allocRunner struct used by client.Client type AllocRunner interface { - // ID returns the ID of the allocation being run. - ID() string + Alloc() *structs.Allocation - // Run starts the runner and begins executing all the tasks as part of the - // allocation. Run() + Restore() error + Update(*structs.Allocation) + Reconnect(update *structs.Allocation) error + Shutdown() + Destroy() - // State returns a copy of the runners state object - State() *state.State + IsDestroyed() bool + IsMigrating() bool + IsWaiting() bool - TaskStateHandler + WaitCh() <-chan struct{} + DestroyCh() <-chan struct{} + ShutdownCh() <-chan struct{} + + AllocState() *state.State + PersistState() error + AcknowledgeState(*state.State) + LastAcknowledgedStateIsCurrent(*structs.Allocation) bool + SetClientStatus(string) + + Signal(taskName, signal string) error + RestartTask(taskName string, taskEvent *structs.TaskEvent) error + RestartRunning(taskEvent *structs.TaskEvent) error + RestartAll(taskEvent *structs.TaskEvent) error + + GetTaskEventHandler(taskName string) drivermanager.EventHandler + GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler + GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) + StatsReporter() AllocStatsReporter + Listener() *cstructs.AllocListener + GetAllocDir() *allocdir.AllocDir } // TaskStateHandler exposes a handler to be called when a task's state changes diff --git a/client/allocrunner/migrate_hook.go b/client/allocrunner/migrate_hook.go index e2b4da3b0c8..89ed93f98af 100644 --- a/client/allocrunner/migrate_hook.go +++ b/client/allocrunner/migrate_hook.go @@ -9,18 +9,18 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/allocwatcher" + "github.com/hashicorp/nomad/client/config" ) // diskMigrationHook migrates ephemeral disk volumes. Depends on alloc dir // being built but must be run before anything else manipulates the alloc dir. type diskMigrationHook struct { allocDir *allocdir.AllocDir - allocWatcher allocwatcher.PrevAllocMigrator + allocWatcher config.PrevAllocMigrator logger log.Logger } -func newDiskMigrationHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocMigrator, allocDir *allocdir.AllocDir) *diskMigrationHook { +func newDiskMigrationHook(logger log.Logger, allocWatcher config.PrevAllocMigrator, allocDir *allocdir.AllocDir) *diskMigrationHook { h := &diskMigrationHook{ allocDir: allocDir, allocWatcher: allocWatcher, diff --git a/client/allocrunner/testing.go b/client/allocrunner/testing.go index 5cd24881d0e..679f24f7a5a 100644 --- a/client/allocrunner/testing.go +++ b/client/allocrunner/testing.go @@ -11,8 +11,10 @@ import ( "sync" "testing" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/allocrunner/taskrunner/getter" "github.com/hashicorp/nomad/client/allocwatcher" + "github.com/hashicorp/nomad/client/config" clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/consul" "github.com/hashicorp/nomad/client/devicemanager" @@ -67,7 +69,7 @@ func (m *MockStateUpdater) Reset() { m.mu.Unlock() } -func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, func()) { +func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*config.AllocRunnerConfig, func()) { clientConf, cleanup := clientconfig.TestClientConfig(t) consulRegMock := mock.NewServiceRegistrationHandler(clientConf.Logger) @@ -75,7 +77,7 @@ func testAllocRunnerConfig(t *testing.T, alloc *structs.Allocation) (*Config, fu stateDB := new(state.NoopDB) - conf := &Config{ + conf := &config.AllocRunnerConfig{ // Copy the alloc in case the caller edits and reuses it Alloc: alloc.Copy(), Logger: clientConf.Logger, @@ -107,10 +109,10 @@ func TestAllocRunnerFromAlloc(t *testing.T, alloc *structs.Allocation) (*allocRu require.NoError(t, err, "Failed to setup AllocRunner") } - return ar, cleanup + return ar.(*allocRunner), cleanup } -func WaitForClientState(t *testing.T, ar *allocRunner, state string) { +func WaitForClientState(t *testing.T, ar interfaces.AllocRunner, state string) { testutil.WaitForResult(func() (bool, error) { got := ar.AllocState().ClientStatus return got == state, diff --git a/client/allocrunner/upstream_allocs_hook.go b/client/allocrunner/upstream_allocs_hook.go index 4cc23a48d26..53fd59cbca9 100644 --- a/client/allocrunner/upstream_allocs_hook.go +++ b/client/allocrunner/upstream_allocs_hook.go @@ -7,17 +7,17 @@ import ( "context" log "github.com/hashicorp/go-hclog" - "github.com/hashicorp/nomad/client/allocwatcher" + "github.com/hashicorp/nomad/client/config" ) // upstreamAllocsHook waits for a PrevAllocWatcher to exit before allowing // an allocation to be executed type upstreamAllocsHook struct { - allocWatcher allocwatcher.PrevAllocWatcher + allocWatcher config.PrevAllocWatcher logger log.Logger } -func newUpstreamAllocsHook(logger log.Logger, allocWatcher allocwatcher.PrevAllocWatcher) *upstreamAllocsHook { +func newUpstreamAllocsHook(logger log.Logger, allocWatcher config.PrevAllocWatcher) *upstreamAllocsHook { h := &upstreamAllocsHook{ allocWatcher: allocWatcher, } diff --git a/client/allocwatcher/alloc_watcher.go b/client/allocwatcher/alloc_watcher.go index cf94487507f..1df200e3bee 100644 --- a/client/allocwatcher/alloc_watcher.go +++ b/client/allocwatcher/alloc_watcher.go @@ -49,29 +49,6 @@ type AllocRunnerMeta interface { Alloc() *structs.Allocation } -// PrevAllocWatcher allows AllocRunners to wait for a previous allocation to -// terminate whether or not the previous allocation is local or remote. -// See `PrevAllocMigrator` for migrating workloads. -type PrevAllocWatcher interface { - // Wait for previous alloc to terminate - Wait(context.Context) error - - // IsWaiting returns true if a concurrent caller is blocked in Wait - IsWaiting() bool -} - -// PrevAllocMigrator allows AllocRunners to migrate a previous allocation -// whether or not the previous allocation is local or remote. -type PrevAllocMigrator interface { - PrevAllocWatcher - - // IsMigrating returns true if a concurrent caller is in Migrate - IsMigrating() bool - - // Migrate data from previous alloc - Migrate(ctx context.Context, dest *allocdir.AllocDir) error -} - type Config struct { // Alloc is the current allocation which may need to block on its // previous allocation stopping. @@ -97,7 +74,7 @@ type Config struct { Logger hclog.Logger } -func newMigratorForAlloc(c Config, tg *structs.TaskGroup, watchedAllocID string, m AllocRunnerMeta) PrevAllocMigrator { +func newMigratorForAlloc(c Config, tg *structs.TaskGroup, watchedAllocID string, m AllocRunnerMeta) config.PrevAllocMigrator { logger := c.Logger.Named("alloc_migrator").With("alloc_id", c.Alloc.ID).With("previous_alloc", watchedAllocID) tasks := tg.Tasks @@ -136,7 +113,7 @@ func newMigratorForAlloc(c Config, tg *structs.TaskGroup, watchedAllocID string, // Note that c.Alloc.PreviousAllocation must NOT be used in this func as it // used for preemption which has a distinct field. The caller is responsible // for passing the allocation to be watched as watchedAllocID. -func newWatcherForAlloc(c Config, watchedAllocID string, m AllocRunnerMeta) PrevAllocWatcher { +func newWatcherForAlloc(c Config, watchedAllocID string, m AllocRunnerMeta) config.PrevAllocWatcher { logger := c.Logger.Named("alloc_watcher").With("alloc_id", c.Alloc.ID).With("previous_alloc", watchedAllocID) if m != nil { @@ -167,13 +144,13 @@ func newWatcherForAlloc(c Config, watchedAllocID string, m AllocRunnerMeta) Prev // For allocs which are either running on another node or have already // terminated their alloc runners, use a remote backend which watches the alloc // status via rpc. -func NewAllocWatcher(c Config) (PrevAllocWatcher, PrevAllocMigrator) { +func NewAllocWatcher(c Config) (config.PrevAllocWatcher, config.PrevAllocMigrator) { if c.Alloc.PreviousAllocation == "" && c.PreemptedRunners == nil { return NoopPrevAlloc{}, NoopPrevAlloc{} } - var prevAllocWatchers []PrevAllocWatcher - var prevAllocMigrator PrevAllocMigrator = NoopPrevAlloc{} + var prevAllocWatchers []config.PrevAllocWatcher + var prevAllocMigrator config.PrevAllocMigrator = NoopPrevAlloc{} // We have a previous allocation, add its listener to the watchers, and // use a migrator. diff --git a/client/allocwatcher/group_alloc_watcher.go b/client/allocwatcher/group_alloc_watcher.go index 96d4afb6fb0..d4497058965 100644 --- a/client/allocwatcher/group_alloc_watcher.go +++ b/client/allocwatcher/group_alloc_watcher.go @@ -8,10 +8,12 @@ import ( "sync" multierror "github.com/hashicorp/go-multierror" + + "github.com/hashicorp/nomad/client/config" ) type groupPrevAllocWatcher struct { - prevAllocs []PrevAllocWatcher + prevAllocs []config.PrevAllocWatcher wg sync.WaitGroup // waiting and migrating are true when alloc runner is waiting on the @@ -21,7 +23,7 @@ type groupPrevAllocWatcher struct { waitingLock sync.RWMutex } -func NewGroupAllocWatcher(watchers ...PrevAllocWatcher) PrevAllocWatcher { +func NewGroupAllocWatcher(watchers ...config.PrevAllocWatcher) config.PrevAllocWatcher { return &groupPrevAllocWatcher{ prevAllocs: watchers, } @@ -48,7 +50,7 @@ func (g *groupPrevAllocWatcher) Wait(ctx context.Context) error { g.wg.Add(len(g.prevAllocs)) for _, alloc := range g.prevAllocs { - go func(ctx context.Context, alloc PrevAllocWatcher) { + go func(ctx context.Context, alloc config.PrevAllocWatcher) { defer g.wg.Done() err := alloc.Wait(ctx) if err != nil { diff --git a/client/allocwatcher/group_alloc_watcher_test.go b/client/allocwatcher/group_alloc_watcher_test.go index 88275466335..4754e0fec83 100644 --- a/client/allocwatcher/group_alloc_watcher_test.go +++ b/client/allocwatcher/group_alloc_watcher_test.go @@ -9,6 +9,7 @@ import ( "testing" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" @@ -28,7 +29,7 @@ func TestPrevAlloc_GroupPrevAllocWatcher_Block(t *testing.T) { waiter, _ := NewAllocWatcher(conf) - groupWaiter := &groupPrevAllocWatcher{prevAllocs: []PrevAllocWatcher{waiter}} + groupWaiter := &groupPrevAllocWatcher{prevAllocs: []config.PrevAllocWatcher{waiter}} // Wait in a goroutine with a context to make sure it exits at the right time ctx, cancel := context.WithCancel(context.Background()) @@ -102,7 +103,7 @@ func TestPrevAlloc_GroupPrevAllocWatcher_BlockMulti(t *testing.T) { waiter2, _ := NewAllocWatcher(conf2) groupWaiter := &groupPrevAllocWatcher{ - prevAllocs: []PrevAllocWatcher{ + prevAllocs: []config.PrevAllocWatcher{ waiter1, waiter2, }, diff --git a/client/client.go b/client/client.go index 3f8d237a7b6..eb8fe440b77 100644 --- a/client/client.go +++ b/client/client.go @@ -43,7 +43,6 @@ import ( "github.com/hashicorp/nomad/client/serviceregistration/wrapper" "github.com/hashicorp/nomad/client/state" "github.com/hashicorp/nomad/client/stats" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/client/vaultclient" "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper" @@ -57,7 +56,6 @@ import ( nconfig "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/plugins/csi" "github.com/hashicorp/nomad/plugins/device" - "github.com/hashicorp/nomad/plugins/drivers" vaultapi "github.com/hashicorp/vault/api" "github.com/shirou/gopsutil/v3/host" "golang.org/x/exp/maps" @@ -138,40 +136,6 @@ type ClientStatsReporter interface { LatestHostStats() *stats.HostStats } -// AllocRunner is the interface implemented by the core alloc runner. -// TODO Create via factory to allow testing Client with mock AllocRunners. -type AllocRunner interface { - Alloc() *structs.Allocation - AllocState() *arstate.State - Destroy() - Shutdown() - GetAllocDir() *allocdir.AllocDir - IsDestroyed() bool - IsMigrating() bool - IsWaiting() bool - Listener() *cstructs.AllocListener - Restore() error - Run() - StatsReporter() interfaces.AllocStatsReporter - Update(*structs.Allocation) - WaitCh() <-chan struct{} - DestroyCh() <-chan struct{} - ShutdownCh() <-chan struct{} - Signal(taskName, signal string) error - GetTaskEventHandler(taskName string) drivermanager.EventHandler - PersistState() error - AcknowledgeState(*arstate.State) - LastAcknowledgedStateIsCurrent(*structs.Allocation) bool - - RestartTask(taskName string, taskEvent *structs.TaskEvent) error - RestartRunning(taskEvent *structs.TaskEvent) error - RestartAll(taskEvent *structs.TaskEvent) error - Reconnect(update *structs.Allocation) error - - GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler - GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) -} - // Client is used to implement the client interaction with Nomad. Clients // are expected to register as a schedule-able node to the servers, and to // run allocations as determined by the servers. @@ -237,9 +201,12 @@ type Client struct { // allocs maps alloc IDs to their AllocRunner. This map includes all // AllocRunners - running and GC'd - until the server GCs them. - allocs map[string]AllocRunner + allocs map[string]interfaces.AllocRunner allocLock sync.RWMutex + // allocrunnerFactory is the function called to create new allocrunners + allocrunnerFactory config.AllocRunnerFactory + // invalidAllocs is a map that tracks allocations that failed because // the client couldn't initialize alloc or task runners for it. This can // happen due to driver errors @@ -398,7 +365,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie streamingRpcs: structs.NewStreamingRpcRegistry(), logger: logger, rpcLogger: logger.Named("rpc"), - allocs: make(map[string]AllocRunner), + allocs: make(map[string]interfaces.AllocRunner), allocUpdates: make(chan *structs.Allocation, 64), shutdownCh: make(chan struct{}), triggerDiscoveryCh: make(chan struct{}), @@ -411,6 +378,12 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie cpusetManager: cgutil.CreateCPUSetManager(cfg.CgroupParent, cfg.ReservableCores, logger), getter: getter.New(cfg.Artifact, logger), EnterpriseClient: newEnterpriseClient(logger), + allocrunnerFactory: cfg.AllocRunnerFactory, + } + + // we can't have this set in the default Config because of import cycles + if c.allocrunnerFactory == nil { + c.allocrunnerFactory = allocrunner.NewAllocRunner } c.batchNodeUpdates = newBatchNodeUpdates( @@ -986,7 +959,7 @@ func (c *Client) Node() *structs.Node { // getAllocRunner returns an AllocRunner or an UnknownAllocation error if the // client has no runner for the given alloc ID. -func (c *Client) getAllocRunner(allocID string) (AllocRunner, error) { +func (c *Client) getAllocRunner(allocID string) (interfaces.AllocRunner, error) { c.allocLock.RLock() defer c.allocLock.RUnlock() @@ -1230,7 +1203,7 @@ func (c *Client) restoreState() error { prevAllocWatcher := allocwatcher.NoopPrevAlloc{} prevAllocMigrator := allocwatcher.NoopPrevAlloc{} - arConf := &allocrunner.Config{ + arConf := &config.AllocRunnerConfig{ Alloc: alloc, Logger: c.logger, ClientConfig: conf, @@ -1255,7 +1228,7 @@ func (c *Client) restoreState() error { Getter: c.getter, } - ar, err := allocrunner.NewAllocRunner(arConf) + ar, err := c.allocrunnerFactory(arConf) if err != nil { c.logger.Error("error running alloc", "error", err, "alloc_id", alloc.ID) c.handleInvalidAllocs(alloc, err) @@ -1364,7 +1337,7 @@ func (c *Client) saveState() error { wg.Add(len(runners)) for id, ar := range runners { - go func(id string, ar AllocRunner) { + go func(id string, ar interfaces.AllocRunner) { err := ar.PersistState() if err != nil { c.logger.Error("error saving alloc state", "error", err, "alloc_id", id) @@ -1381,10 +1354,10 @@ func (c *Client) saveState() error { } // getAllocRunners returns a snapshot of the current set of alloc runners. -func (c *Client) getAllocRunners() map[string]AllocRunner { +func (c *Client) getAllocRunners() map[string]interfaces.AllocRunner { c.allocLock.RLock() defer c.allocLock.RUnlock() - runners := make(map[string]AllocRunner, len(c.allocs)) + runners := make(map[string]interfaces.AllocRunner, len(c.allocs)) for id, ar := range c.allocs { runners[id] = ar } @@ -2673,7 +2646,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error } prevAllocWatcher, prevAllocMigrator := allocwatcher.NewAllocWatcher(watcherConfig) - arConf := &allocrunner.Config{ + arConf := &config.AllocRunnerConfig{ Alloc: alloc, Logger: c.logger, ClientConfig: c.GetConfig(), @@ -2697,7 +2670,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error Getter: c.getter, } - ar, err := allocrunner.NewAllocRunner(arConf) + ar, err := c.allocrunnerFactory(arConf) if err != nil { return err } diff --git a/client/client_interface_test.go b/client/client_interface_test.go new file mode 100644 index 00000000000..5a985b7bce4 --- /dev/null +++ b/client/client_interface_test.go @@ -0,0 +1,165 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package client + +import ( + "sync" + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/allocrunner/state" + "github.com/hashicorp/nomad/client/config" + cinterfaces "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/device" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/testutil" +) + +// TestEmptyAllocRunner demonstrates the minimum interface necessary to +// implement a mock AllocRunner that can report client status back to the server +func TestEmptyAllocRunner(t *testing.T) { + ci.Parallel(t) + + s1, _, cleanupS1 := testServer(t, nil) + defer cleanupS1() + + _, cleanup := TestClient(t, func(c *config.Config) { + c.RPCHandler = s1 + c.AllocRunnerFactory = newEmptyAllocRunnerFunc + }) + defer cleanup() + + job := mock.Job() + job.Constraints = nil + job.TaskGroups[0].Constraints = nil + job.TaskGroups[0].Count = 1 + task := job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "10s", + } + task.Services = nil + + // WaitForRunning polls the server until the ClientStatus is running + testutil.WaitForRunning(t, s1.RPC, job) +} + +type emptyAllocRunner struct { + c cinterfaces.AllocStateHandler + alloc *structs.Allocation + allocState *state.State + allocLock sync.RWMutex +} + +func newEmptyAllocRunnerFunc(conf *config.AllocRunnerConfig) (interfaces.AllocRunner, error) { + return &emptyAllocRunner{ + c: conf.StateUpdater, + alloc: conf.Alloc, + allocState: &state.State{}, + }, nil +} + +func (ar *emptyAllocRunner) Alloc() *structs.Allocation { + ar.allocLock.RLock() + defer ar.allocLock.RUnlock() + return ar.alloc.Copy() +} + +func (ar *emptyAllocRunner) Run() { + ar.allocLock.RLock() + defer ar.allocLock.RUnlock() + ar.alloc.ClientStatus = "running" + ar.c.AllocStateUpdated(ar.alloc) +} + +func (ar *emptyAllocRunner) Restore() error { return nil } +func (ar *emptyAllocRunner) Update(update *structs.Allocation) { + ar.allocLock.Lock() + defer ar.allocLock.Unlock() + ar.alloc = update +} + +func (ar *emptyAllocRunner) Reconnect(update *structs.Allocation) error { + ar.allocLock.Lock() + defer ar.allocLock.Unlock() + ar.alloc = update + return nil +} + +func (ar *emptyAllocRunner) Shutdown() {} +func (ar *emptyAllocRunner) Destroy() {} + +func (ar *emptyAllocRunner) IsDestroyed() bool { return false } +func (ar *emptyAllocRunner) IsMigrating() bool { return false } +func (ar *emptyAllocRunner) IsWaiting() bool { return false } + +func (ar *emptyAllocRunner) WaitCh() <-chan struct{} { return make(chan struct{}) } + +func (ar *emptyAllocRunner) DestroyCh() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +func (ar *emptyAllocRunner) ShutdownCh() <-chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +} + +func (ar *emptyAllocRunner) AllocState() *state.State { + ar.allocLock.RLock() + defer ar.allocLock.RUnlock() + return ar.allocState.Copy() +} + +func (ar *emptyAllocRunner) PersistState() error { return nil } +func (ar *emptyAllocRunner) AcknowledgeState(*state.State) {} +func (ar *emptyAllocRunner) LastAcknowledgedStateIsCurrent(*structs.Allocation) bool { return false } + +func (ar *emptyAllocRunner) SetClientStatus(status string) { + ar.allocLock.Lock() + defer ar.allocLock.Unlock() + ar.alloc.ClientStatus = status +} + +func (ar *emptyAllocRunner) Signal(taskName, signal string) error { return nil } +func (ar *emptyAllocRunner) RestartTask(taskName string, taskEvent *structs.TaskEvent) error { + return nil +} +func (ar *emptyAllocRunner) RestartRunning(taskEvent *structs.TaskEvent) error { return nil } +func (ar *emptyAllocRunner) RestartAll(taskEvent *structs.TaskEvent) error { return nil } + +func (ar *emptyAllocRunner) GetTaskEventHandler(taskName string) drivermanager.EventHandler { + return nil +} +func (ar *emptyAllocRunner) GetTaskExecHandler(taskName string) drivermanager.TaskExecHandler { + return nil +} +func (ar *emptyAllocRunner) GetTaskDriverCapabilities(taskName string) (*drivers.Capabilities, error) { + return nil, nil +} + +func (ar *emptyAllocRunner) StatsReporter() interfaces.AllocStatsReporter { return ar } +func (ar *emptyAllocRunner) Listener() *cstructs.AllocListener { return nil } +func (ar *emptyAllocRunner) GetAllocDir() *allocdir.AllocDir { return nil } + +// LatestAllocStats lets this empty runner implement AllocStatsReporter +func (ar *emptyAllocRunner) LatestAllocStats(taskFilter string) (*cstructs.AllocResourceUsage, error) { + return &cstructs.AllocResourceUsage{ + ResourceUsage: &cstructs.ResourceUsage{ + MemoryStats: &cstructs.MemoryStats{}, + CpuStats: &cstructs.CpuStats{}, + DeviceStats: []*device.DeviceGroupStats{}, + }, + Tasks: map[string]*cstructs.TaskResourceUsage{}, + Timestamp: 0, + }, nil +} diff --git a/client/client_test.go b/client/client_test.go index 10215da8617..a84fd9cf914 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1932,7 +1932,7 @@ func TestClient_ReconnectAllocs(t *testing.T) { c1.runAllocs(updates) invalid := false - var runner AllocRunner + var runner interfaces.AllocRunner var finalAlloc *structs.Allocation // Ensure the allocation is not invalid on the client and has been marked // running on the server with the new modify index diff --git a/client/config/arconfig.go b/client/config/arconfig.go new file mode 100644 index 00000000000..7496bc26619 --- /dev/null +++ b/client/config/arconfig.go @@ -0,0 +1,136 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +package config + +import ( + "context" + + log "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/nomad/client/allocdir" + arinterfaces "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/client/consul" + "github.com/hashicorp/nomad/client/devicemanager" + "github.com/hashicorp/nomad/client/dynamicplugins" + "github.com/hashicorp/nomad/client/interfaces" + "github.com/hashicorp/nomad/client/lib/cgutil" + "github.com/hashicorp/nomad/client/pluginmanager/csimanager" + "github.com/hashicorp/nomad/client/pluginmanager/drivermanager" + "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/client/serviceregistration/checks/checkstore" + "github.com/hashicorp/nomad/client/serviceregistration/wrapper" + cstate "github.com/hashicorp/nomad/client/state" + "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/nomad/structs" +) + +// AllocRunnerFactory returns an AllocRunner interface built from the +// configuration. Note: the type for config is any because we can't count on +// test callers being able to make a real allocrunner.Config without an circular +// import +type AllocRunnerFactory func(*AllocRunnerConfig) (arinterfaces.AllocRunner, error) + +// RPCer is the interface needed by hooks to make RPC calls. +type RPCer interface { + RPC(method string, args interface{}, reply interface{}) error +} + +// AllocRunnerConfig holds the configuration for creating an allocation runner. +type AllocRunnerConfig struct { + // Logger is the logger for the allocation runner. + Logger log.Logger + + // ClientConfig is the clients configuration. + ClientConfig *Config + + // Alloc captures the allocation that should be run. + Alloc *structs.Allocation + + // StateDB is used to store and restore state. + StateDB cstate.StateDB + + // Consul is the Consul client used to register task services and checks + Consul serviceregistration.Handler + + // ConsulProxies is the Consul client used to lookup supported envoy versions + // of the Consul agent. + ConsulProxies consul.SupportedProxiesAPI + + // ConsulSI is the Consul client used to manage service identity tokens. + ConsulSI consul.ServiceIdentityAPI + + // Vault is the Vault client to use to retrieve Vault tokens + Vault vaultclient.VaultClient + + // StateUpdater is used to emit updated task state + StateUpdater interfaces.AllocStateHandler + + // DeviceStatsReporter is used to lookup resource usage for alloc devices + DeviceStatsReporter interfaces.DeviceStatsReporter + + // PrevAllocWatcher handles waiting on previous or preempted allocations + PrevAllocWatcher PrevAllocWatcher + + // PrevAllocMigrator allows the migration of a previous allocations alloc dir + PrevAllocMigrator PrevAllocMigrator + + // DynamicRegistry contains all locally registered dynamic plugins (e.g csi + // plugins). + DynamicRegistry dynamicplugins.Registry + + // CSIManager is used to wait for CSI Volumes to be attached, and by the task + // runner to manage their mounting + CSIManager csimanager.Manager + + // DeviceManager is used to mount devices as well as lookup device + // statistics + DeviceManager devicemanager.Manager + + // DriverManager handles dispensing of driver plugins + DriverManager drivermanager.Manager + + // CpusetManager configures the cpuset cgroup if supported by the platform + CpusetManager cgutil.CpusetManager + + // ServersContactedCh is closed when the first GetClientAllocs call to + // servers succeeds and allocs are synced. + ServersContactedCh chan struct{} + + // RPCClient is the RPC Client that should be used by the allocrunner and its + // hooks to communicate with Nomad Servers. + RPCClient RPCer + + // ServiceRegWrapper is the handler wrapper that is used by service hooks + // to perform service and check registration and deregistration. + ServiceRegWrapper *wrapper.HandlerWrapper + + // CheckStore contains check result information. + CheckStore checkstore.Shim + + // Getter is an interface for retrieving artifacts. + Getter interfaces.ArtifactGetter +} + +// PrevAllocWatcher allows AllocRunners to wait for a previous allocation to +// terminate whether or not the previous allocation is local or remote. +// See `PrevAllocMigrator` for migrating workloads. +type PrevAllocWatcher interface { + // Wait for previous alloc to terminate + Wait(context.Context) error + + // IsWaiting returns true if a concurrent caller is blocked in Wait + IsWaiting() bool +} + +// PrevAllocMigrator allows AllocRunners to migrate a previous allocation +// whether or not the previous allocation is local or remote. +type PrevAllocMigrator interface { + PrevAllocWatcher + + // IsMigrating returns true if a concurrent caller is in Migrate + IsMigrating() bool + + // Migrate data from previous alloc + Migrate(ctx context.Context, dest *allocdir.AllocDir) error +} diff --git a/client/config/config.go b/client/config/config.go index e46eda76d0d..d4a1c474ae1 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -250,6 +250,8 @@ type Config struct { // StateDBFactory is used to override stateDB implementations, StateDBFactory state.NewStateDBFunc + AllocRunnerFactory AllocRunnerFactory + // CNIPath is the path used to search for CNI plugins. Multiple paths can // be specified with colon delimited CNIPath string diff --git a/client/gc.go b/client/gc.go index 0c95fd7f831..4bf319b44b4 100644 --- a/client/gc.go +++ b/client/gc.go @@ -10,6 +10,8 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/nomad/structs" ) @@ -173,7 +175,7 @@ func (a *AllocGarbageCollector) keepUsageBelowThreshold() error { // destroyAllocRunner is used to destroy an allocation runner. It will acquire a // lock to restrict parallelism and then destroy the alloc runner, returning // once the allocation has been destroyed. -func (a *AllocGarbageCollector) destroyAllocRunner(allocID string, ar AllocRunner, reason string) { +func (a *AllocGarbageCollector) destroyAllocRunner(allocID string, ar interfaces.AllocRunner, reason string) { a.logger.Info("garbage collecting allocation", "alloc_id", allocID, "reason", reason) // Acquire the destroy lock @@ -338,7 +340,7 @@ func (a *AllocGarbageCollector) MakeRoomFor(allocations []*structs.Allocation) e } // MarkForCollection starts tracking an allocation for Garbage Collection -func (a *AllocGarbageCollector) MarkForCollection(allocID string, ar AllocRunner) { +func (a *AllocGarbageCollector) MarkForCollection(allocID string, ar interfaces.AllocRunner) { if a.allocRunners.Push(allocID, ar) { a.logger.Info("marking allocation for GC", "alloc_id", allocID) } @@ -349,7 +351,7 @@ func (a *AllocGarbageCollector) MarkForCollection(allocID string, ar AllocRunner type GCAlloc struct { timeStamp time.Time allocID string - allocRunner AllocRunner + allocRunner interfaces.AllocRunner index int } @@ -403,7 +405,7 @@ func NewIndexedGCAllocPQ() *IndexedGCAllocPQ { // Push an alloc runner into the GC queue. Returns true if alloc was added, // false if the alloc already existed. -func (i *IndexedGCAllocPQ) Push(allocID string, ar AllocRunner) bool { +func (i *IndexedGCAllocPQ) Push(allocID string, ar interfaces.AllocRunner) bool { i.pqLock.Lock() defer i.pqLock.Unlock() diff --git a/client/gc_test.go b/client/gc_test.go index 991478a7966..c5c820c4412 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -10,6 +10,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocrunner" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/stats" "github.com/hashicorp/nomad/helper/testlog" @@ -32,7 +33,7 @@ func gcConfig() *GCConfig { // exitAllocRunner is a helper that updates the allocs on the given alloc // runners to be terminal -func exitAllocRunner(runners ...AllocRunner) { +func exitAllocRunner(runners ...interfaces.AllocRunner) { for _, ar := range runners { terminalAlloc := ar.Alloc().Copy() terminalAlloc.DesiredStatus = structs.AllocDesiredStatusStop diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index d13894de19f..79e1e4765a8 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -8,6 +8,8 @@ import ( "time" hclog "github.com/hashicorp/go-hclog" + + "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/nomad/structs" ) @@ -16,14 +18,14 @@ type heartbeatStop struct { startupGrace time.Time allocInterval map[string]time.Duration allocHookCh chan *structs.Allocation - getRunner func(string) (AllocRunner, error) + getRunner func(string) (interfaces.AllocRunner, error) logger hclog.InterceptLogger shutdownCh chan struct{} lock *sync.RWMutex } func newHeartbeatStop( - getRunner func(string) (AllocRunner, error), + getRunner func(string) (interfaces.AllocRunner, error), timeout time.Duration, logger hclog.InterceptLogger, shutdownCh chan struct{}) *heartbeatStop { diff --git a/client/state/upgrade_int_test.go b/client/state/upgrade_int_test.go index 30a0b60c7c5..8832424b463 100644 --- a/client/state/upgrade_int_test.go +++ b/client/state/upgrade_int_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocrunner" "github.com/hashicorp/nomad/client/allocwatcher" + "github.com/hashicorp/nomad/client/config" clientconfig "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/devicemanager" dmstate "github.com/hashicorp/nomad/client/devicemanager/state" @@ -200,7 +201,7 @@ func checkUpgradedAlloc(t *testing.T, path string, db StateDB, alloc *structs.Al clientConf.StateDir = path - conf := &allocrunner.Config{ + conf := &config.AllocRunnerConfig{ Alloc: alloc, Logger: clientConf.Logger, ClientConfig: clientConf, diff --git a/drivers/mock/driver_test.go b/drivers/mock/driver_test.go index f12155705c1..add9563e54f 100644 --- a/drivers/mock/driver_test.go +++ b/drivers/mock/driver_test.go @@ -16,7 +16,6 @@ import ( "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/taskenv" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/testtask" @@ -158,7 +157,7 @@ func mkTestAllocDir(t *testing.T, h *dtestutil.DriverHarness, logger hclog.Logge } taskBuilder := taskenv.NewBuilder(mock.Node(), alloc, task, "global") - dtestutil.SetEnvvars(taskBuilder, drivers.FSIsolationNone, taskDir, config.DefaultConfig()) + dtestutil.SetEnvvars(taskBuilder, drivers.FSIsolationNone, taskDir) taskEnv := taskBuilder.Build() if tc.Env == nil { diff --git a/plugins/drivers/testutils/testing.go b/plugins/drivers/testutils/testing.go index ab966ebf4d7..11855a9b9b3 100644 --- a/plugins/drivers/testutils/testing.go +++ b/plugins/drivers/testutils/testing.go @@ -9,14 +9,12 @@ import ( "os" "path/filepath" "runtime" - "strings" "time" hclog "github.com/hashicorp/go-hclog" plugin "github.com/hashicorp/go-plugin" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/lib/cgutil" "github.com/hashicorp/nomad/client/logmon" "github.com/hashicorp/nomad/client/taskenv" @@ -148,7 +146,7 @@ func (h *DriverHarness) MkAllocDir(t *drivers.TaskConfig, enableLogs bool) func( } taskBuilder := taskenv.NewBuilder(mock.Node(), alloc, task, "global") - SetEnvvars(taskBuilder, fsi, taskDir, config.DefaultConfig()) + SetEnvvars(taskBuilder, fsi, taskDir) taskEnv := taskBuilder.Build() if t.Env == nil { @@ -294,7 +292,7 @@ func (d *MockDriver) ExecTaskStreaming(ctx context.Context, taskID string, execO } // SetEnvvars sets path and host env vars depending on the FS isolation used. -func SetEnvvars(envBuilder *taskenv.Builder, fsi drivers.FSIsolation, taskDir *allocdir.TaskDir, conf *config.Config) { +func SetEnvvars(envBuilder *taskenv.Builder, fsi drivers.FSIsolation, taskDir *allocdir.TaskDir) { envBuilder.SetClientTaskRoot(taskDir.Dir) envBuilder.SetClientSharedAllocDir(taskDir.SharedAllocDir) @@ -317,11 +315,6 @@ func SetEnvvars(envBuilder *taskenv.Builder, fsi drivers.FSIsolation, taskDir *a // Set the host environment variables for non-image based drivers if fsi != drivers.FSIsolationImage { - // COMPAT(1.0) using inclusive language, blacklist is kept for backward compatibility. - filter := strings.Split(conf.ReadAlternativeDefault( - []string{"env.denylist", "env.blacklist"}, - config.DefaultEnvDenylist, - ), ",") - envBuilder.SetHostEnvvars(filter) + envBuilder.SetHostEnvvars([]string{"env.denylist"}) } }