From e7ca5f5e4cb59feb71e2b369f7055131f98c0ead Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 25 Oct 2024 20:08:18 -0700 Subject: [PATCH 1/3] KubernetesTaskRunner: Wait in start() for tasks to be located. This helps with orderly Overlord failover. After this patch, by the time the task runner returns from start(), all tasks are located (subject to some timeout). This is useful for supervisors, which start next and which need to contact tasks. The timeout defaults to 1 minute, and is configurable using druid.indexer.runner.taskJoinTimeout. --- .../extensions-contrib/k8s-jobs.md | 1 + .../k8s/overlord/KubernetesPeonLifecycle.java | 24 ++++++++--- .../k8s/overlord/KubernetesTaskRunner.java | 43 +++++++++++++++---- .../overlord/KubernetesTaskRunnerConfig.java | 10 +++++ .../k8s/overlord/KubernetesWorkItem.java | 39 +++++++++++------ .../overlord/KubernetesTaskRunnerTest.java | 27 ++++++++---- 6 files changed, 108 insertions(+), 36 deletions(-) diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 913e40b93733..11663642bdd1 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -610,6 +610,7 @@ Druid selects the pod template `podSpecWithHighMemRequests.yaml`. |`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No| |`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No| |`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No| +|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for gathering metadata about existing tasks on startup |`PT1M`|No| |`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No| |`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No| |`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No| diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index fd6ae4bd6f18..00f1b7076e23 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -23,12 +23,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.dsl.LogWatch; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; @@ -94,7 +97,7 @@ protected enum State @MonotonicNonNull private LogWatch logWatch; - private TaskLocation taskLocation; + private final SettableFuture taskLocation = SettableFuture.create(); protected KubernetesPeonLifecycle( Task task, @@ -130,8 +133,6 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, writeTaskPayload(task); } - // In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation. - taskLocation = null; kubernetesClient.launchPeonJobAndWaitForStart( job, task, @@ -183,7 +184,7 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher if we decide we need to change this later. **/ - taskLocation = getTaskLocationFromK8s(); + taskLocation.set(getTaskLocationFromK8s()); updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( @@ -260,11 +261,19 @@ protected TaskLocation getTaskLocation() since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher if we decide we need to change this later. **/ - if (taskLocation == null) { + if (!taskLocation.isDone()) { log.warn("Unknown task location for [%s]", taskId); return TaskLocation.unknown(); } + return FutureUtils.getUncheckedImmediately(taskLocation); + } + + /** + * Get a future that resolves to the task location, when available. + */ + public ListenableFuture getTaskLocationAsync() + { return taskLocation; } @@ -393,4 +402,9 @@ protected TaskLocation getTaskLocationFromK8s() ); } + + public ListenableFuture locatedFuture() + { + return taskLocation; + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index c324b49e13a2..1e3d69d4c507 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -55,12 +56,14 @@ import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -151,11 +154,10 @@ public ListenableFuture run(Task task) } } - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))); } } @@ -270,7 +272,6 @@ public void shutdown(String taskid, String reason) synchronized (tasks) { tasks.remove(taskid); } - } @Override @@ -321,16 +322,41 @@ public List>> restore() public void start() { log.info("Starting K8sTaskRunner..."); - // Load tasks from previously running jobs and wait for their statuses to be updated asynchronously. - for (Job job : client.getPeonJobs()) { + + // Load tasks from previously running jobs and wait for their locations to be discovered. + final List> taskLocationFutures = new ArrayList<>(); + final List peonJobs = client.getPeonJobs(); + + log.info("Locating [%,d] active tasks.", peonJobs.size()); + + for (final Job job : peonJobs) { try { - joinAsync(adapter.toTask(job)); + taskLocationFutures.add(joinAsync(adapter.toTask(job)).getTaskLocationAsync()); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } - log.info("Loaded %,d tasks from previous run", tasks.size()); + + try { + final DateTime nowUtc = DateTimes.nowUtc(); + final long timeoutMs = nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis(); + if (timeoutMs > 0) { + FutureUtils.coalesce(taskLocationFutures).get(timeoutMs, TimeUnit.MILLISECONDS); + } + log.info("Located [%,d] active tasks.", taskLocationFutures.size()); + } + catch (Exception e) { + final long numInitialized = + tasks.values().stream().filter(item -> item.getTaskLocationAsync().isDone()).count(); + log.warn( + e, + "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.", + numInitialized, + taskLocationFutures.size(), + config.getTaskJoinTimeout() + ); + } cleanupExecutor.scheduleAtFixedRate( () -> @@ -350,6 +376,7 @@ public void start() public void stop() { log.debug("Stopping KubernetesTaskRunner"); + exec.shutdownNow(); cleanupExecutor.shutdownNow(); log.debug("Stopped KubernetesTaskRunner"); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 60efa3c48569..91235ee8add5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -85,6 +85,11 @@ public class KubernetesTaskRunnerConfig // interval for k8s job cleanup to run private Period taskCleanupInterval = new Period("PT10m"); + @JsonProperty + @NotNull + // how long to wait to join peon k8s jobs on startup + private Period taskJoinTimeout = new Period("PT1M"); + @JsonProperty @NotNull // how long to wait for the peon k8s job to launch @@ -248,6 +253,11 @@ public Period getTaskTimeout() return maxTaskDuration; } + public Period getTaskJoinTimeout() + { + return taskJoinTimeout; + } + public Period getTaskCleanupDelay() { return taskCleanupDelay; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 94d4bbb67f63..44a3ee538d80 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -20,8 +20,10 @@ package org.apache.druid.k8s.overlord; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -34,7 +36,7 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; - private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; + private final SettableFuture kubernetesPeonLifecycle = SettableFuture.create(); public KubernetesWorkItem(Task task, ListenableFuture statusFuture) { @@ -44,16 +46,17 @@ public KubernetesWorkItem(Task task, ListenableFuture statusFuture) protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle) { - Preconditions.checkState(this.kubernetesPeonLifecycle == null); - this.kubernetesPeonLifecycle = kubernetesPeonLifecycle; + if (!this.kubernetesPeonLifecycle.set(kubernetesPeonLifecycle)) { + throw DruidException.defensive("Unexpected call to setKubernetesPeonLifecycle, can only call this once."); + } } protected synchronized void shutdown() { - - if (this.kubernetesPeonLifecycle != null) { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); + if (kubernetesPeonLifecycle.isDone()) { + final KubernetesPeonLifecycle lifecycle = FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle); + lifecycle.startWatchingLogs(); + lifecycle.shutdown(); } } @@ -69,11 +72,11 @@ protected boolean isRunning() protected RunnerTaskState getRunnerTaskState() { - if (kubernetesPeonLifecycle == null) { + if (!kubernetesPeonLifecycle.isDone()) { return RunnerTaskState.PENDING; } - switch (kubernetesPeonLifecycle.getState()) { + switch (FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).getState()) { case NOT_STARTED: case PENDING: return RunnerTaskState.PENDING; @@ -88,19 +91,19 @@ protected RunnerTaskState getRunnerTaskState() protected Optional streamTaskLogs() { - if (kubernetesPeonLifecycle == null) { + if (!kubernetesPeonLifecycle.isDone()) { return Optional.absent(); } - return kubernetesPeonLifecycle.streamLogs(); + return FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).streamLogs(); } @Override public TaskLocation getLocation() { - if (kubernetesPeonLifecycle == null) { + if (!kubernetesPeonLifecycle.isDone()) { return TaskLocation.unknown(); } - return kubernetesPeonLifecycle.getTaskLocation(); + return FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).getTaskLocation(); } @Override @@ -119,4 +122,12 @@ public Task getTask() { return task; } + + /** + * Future that resolves when the work item's lifecycle has been initialized. + */ + public ListenableFuture getTaskLocationAsync() + { + return FutureUtils.transformAsync(kubernetesPeonLifecycle, KubernetesPeonLifecycle::getTaskLocationAsync); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 67a5278c6a32..be9d5ef091f7 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -113,15 +113,18 @@ public void test_start_withExistingJobs() throws IOException ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { - return tasks.computeIfAbsent( + final KubernetesWorkItem workItem = tasks.computeIfAbsent( task.getId(), k -> new KubernetesWorkItem( task, Futures.immediateFuture(TaskStatus.success(task.getId())) ) - ).getResult(); + ); + + workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); + return workItem; } }; @@ -133,6 +136,8 @@ protected ListenableFuture joinAsync(Task task) EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(kubernetesPeonLifecycle.getTaskLocationAsync()) + .andReturn(Futures.immediateFuture(TaskLocation.create("localhost", 1234, -1))); replayAll(); @@ -142,6 +147,8 @@ protected ListenableFuture joinAsync(Task task) Assert.assertNotNull(runner.tasks); Assert.assertEquals(1, runner.tasks.size()); + + runner.stop(); } @Test @@ -157,10 +164,9 @@ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOE ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)); } }; @@ -181,6 +187,8 @@ protected ListenableFuture joinAsync(Task task) Assert.assertNotNull(runner.tasks); Assert.assertEquals(0, runner.tasks.size()); + + runner.stop(); } @Test @@ -286,8 +294,8 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt replayAll(); - ListenableFuture future = runner.joinAsync(task); - Assert.assertEquals(taskStatus, future.get()); + KubernetesWorkItem workItem = runner.joinAsync(task); + Assert.assertEquals(taskStatus, workItem.getResult().get()); verifyAll(); } @@ -310,7 +318,8 @@ public void test_join_whenExceptionThrown_throwsRuntimeException() replayAll(); - ListenableFuture future = runner.joinAsync(task); + KubernetesWorkItem workItem = runner.joinAsync(task); + final ListenableFuture future = workItem.getResult(); Exception e = Assert.assertThrows(ExecutionException.class, future::get); Assert.assertTrue(e.getCause() instanceof RuntimeException); From 0805cfbc751419a4834496835a715092eafd98a7 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Mon, 28 Oct 2024 08:33:02 -0700 Subject: [PATCH 2/3] Fix test. --- .../org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 7d17193b1714..c4fa1f74157a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -19,6 +19,7 @@ package org.apache.druid.k8s.overlord; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexing.common.task.NoopTask; @@ -60,7 +61,7 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() )); Assert.assertThrows( - IllegalStateException.class, + DruidException.class, () -> workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( task, null, From 8a8a31cb53f7b608402b7a1df545a1a3e9e91be5 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Tue, 29 Oct 2024 10:13:17 -0700 Subject: [PATCH 3/3] Two updates. --- .../apache/druid/k8s/overlord/KubernetesPeonLifecycle.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index 00f1b7076e23..d4b061078aa5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -184,8 +184,8 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher if we decide we need to change this later. **/ - taskLocation.set(getTaskLocationFromK8s()); updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); + taskLocation.set(getTaskLocationFromK8s()); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, @@ -402,9 +402,4 @@ protected TaskLocation getTaskLocationFromK8s() ); } - - public ListenableFuture locatedFuture() - { - return taskLocation; - } }