diff --git a/docs/development/extensions-contrib/k8s-jobs.md b/docs/development/extensions-contrib/k8s-jobs.md index 913e40b93733..11663642bdd1 100644 --- a/docs/development/extensions-contrib/k8s-jobs.md +++ b/docs/development/extensions-contrib/k8s-jobs.md @@ -610,6 +610,7 @@ Druid selects the pod template `podSpecWithHighMemRequests.yaml`. |`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No| |`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No| |`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No| +|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for gathering metadata about existing tasks on startup |`PT1M`|No| |`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No| |`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No| |`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No| diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java index fd6ae4bd6f18..d4b061078aa5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesPeonLifecycle.java @@ -23,12 +23,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodStatus; import io.fabric8.kubernetes.api.model.batch.v1.Job; import io.fabric8.kubernetes.client.dsl.LogWatch; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.task.Task; @@ -94,7 +97,7 @@ protected enum State @MonotonicNonNull private LogWatch logWatch; - private TaskLocation taskLocation; + private final SettableFuture taskLocation = SettableFuture.create(); protected KubernetesPeonLifecycle( Task task, @@ -130,8 +133,6 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout, writeTaskPayload(task); } - // In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation. - taskLocation = null; kubernetesClient.launchPeonJobAndWaitForStart( job, task, @@ -183,8 +184,8 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher if we decide we need to change this later. **/ - taskLocation = getTaskLocationFromK8s(); updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING); + taskLocation.set(getTaskLocationFromK8s()); JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion( taskId, @@ -260,11 +261,19 @@ protected TaskLocation getTaskLocation() since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher if we decide we need to change this later. **/ - if (taskLocation == null) { + if (!taskLocation.isDone()) { log.warn("Unknown task location for [%s]", taskId); return TaskLocation.unknown(); } + return FutureUtils.getUncheckedImmediately(taskLocation); + } + + /** + * Get a future that resolves to the task location, when available. + */ + public ListenableFuture getTaskLocationAsync() + { return taskLocation; } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java index c324b49e13a2..1e3d69d4c507 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java @@ -29,6 +29,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.fabric8.kubernetes.api.model.batch.v1.Job; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -55,12 +56,14 @@ import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter; import org.apache.druid.tasklogs.TaskLogStreamer; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; import org.joda.time.Duration; import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -151,11 +154,10 @@ public ListenableFuture run(Task task) } } - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { synchronized (tasks) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task)))); } } @@ -270,7 +272,6 @@ public void shutdown(String taskid, String reason) synchronized (tasks) { tasks.remove(taskid); } - } @Override @@ -321,16 +322,41 @@ public List>> restore() public void start() { log.info("Starting K8sTaskRunner..."); - // Load tasks from previously running jobs and wait for their statuses to be updated asynchronously. - for (Job job : client.getPeonJobs()) { + + // Load tasks from previously running jobs and wait for their locations to be discovered. + final List> taskLocationFutures = new ArrayList<>(); + final List peonJobs = client.getPeonJobs(); + + log.info("Locating [%,d] active tasks.", peonJobs.size()); + + for (final Job job : peonJobs) { try { - joinAsync(adapter.toTask(job)); + taskLocationFutures.add(joinAsync(adapter.toTask(job)).getTaskLocationAsync()); } catch (IOException e) { log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName()); } } - log.info("Loaded %,d tasks from previous run", tasks.size()); + + try { + final DateTime nowUtc = DateTimes.nowUtc(); + final long timeoutMs = nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis(); + if (timeoutMs > 0) { + FutureUtils.coalesce(taskLocationFutures).get(timeoutMs, TimeUnit.MILLISECONDS); + } + log.info("Located [%,d] active tasks.", taskLocationFutures.size()); + } + catch (Exception e) { + final long numInitialized = + tasks.values().stream().filter(item -> item.getTaskLocationAsync().isDone()).count(); + log.warn( + e, + "Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.", + numInitialized, + taskLocationFutures.size(), + config.getTaskJoinTimeout() + ); + } cleanupExecutor.scheduleAtFixedRate( () -> @@ -350,6 +376,7 @@ public void start() public void stop() { log.debug("Stopping KubernetesTaskRunner"); + exec.shutdownNow(); cleanupExecutor.shutdownNow(); log.debug("Stopped KubernetesTaskRunner"); } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java index 60efa3c48569..91235ee8add5 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerConfig.java @@ -85,6 +85,11 @@ public class KubernetesTaskRunnerConfig // interval for k8s job cleanup to run private Period taskCleanupInterval = new Period("PT10m"); + @JsonProperty + @NotNull + // how long to wait to join peon k8s jobs on startup + private Period taskJoinTimeout = new Period("PT1M"); + @JsonProperty @NotNull // how long to wait for the peon k8s job to launch @@ -248,6 +253,11 @@ public Period getTaskTimeout() return maxTaskDuration; } + public Period getTaskJoinTimeout() + { + return taskJoinTimeout; + } + public Period getTaskCleanupDelay() { return taskCleanupDelay; diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java index 94d4bbb67f63..44a3ee538d80 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesWorkItem.java @@ -20,8 +20,10 @@ package org.apache.druid.k8s.overlord; import com.google.common.base.Optional; -import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import org.apache.druid.common.guava.FutureUtils; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; @@ -34,7 +36,7 @@ public class KubernetesWorkItem extends TaskRunnerWorkItem { private final Task task; - private KubernetesPeonLifecycle kubernetesPeonLifecycle = null; + private final SettableFuture kubernetesPeonLifecycle = SettableFuture.create(); public KubernetesWorkItem(Task task, ListenableFuture statusFuture) { @@ -44,16 +46,17 @@ public KubernetesWorkItem(Task task, ListenableFuture statusFuture) protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle) { - Preconditions.checkState(this.kubernetesPeonLifecycle == null); - this.kubernetesPeonLifecycle = kubernetesPeonLifecycle; + if (!this.kubernetesPeonLifecycle.set(kubernetesPeonLifecycle)) { + throw DruidException.defensive("Unexpected call to setKubernetesPeonLifecycle, can only call this once."); + } } protected synchronized void shutdown() { - - if (this.kubernetesPeonLifecycle != null) { - this.kubernetesPeonLifecycle.startWatchingLogs(); - this.kubernetesPeonLifecycle.shutdown(); + if (kubernetesPeonLifecycle.isDone()) { + final KubernetesPeonLifecycle lifecycle = FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle); + lifecycle.startWatchingLogs(); + lifecycle.shutdown(); } } @@ -69,11 +72,11 @@ protected boolean isRunning() protected RunnerTaskState getRunnerTaskState() { - if (kubernetesPeonLifecycle == null) { + if (!kubernetesPeonLifecycle.isDone()) { return RunnerTaskState.PENDING; } - switch (kubernetesPeonLifecycle.getState()) { + switch (FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).getState()) { case NOT_STARTED: case PENDING: return RunnerTaskState.PENDING; @@ -88,19 +91,19 @@ protected RunnerTaskState getRunnerTaskState() protected Optional streamTaskLogs() { - if (kubernetesPeonLifecycle == null) { + if (!kubernetesPeonLifecycle.isDone()) { return Optional.absent(); } - return kubernetesPeonLifecycle.streamLogs(); + return FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).streamLogs(); } @Override public TaskLocation getLocation() { - if (kubernetesPeonLifecycle == null) { + if (!kubernetesPeonLifecycle.isDone()) { return TaskLocation.unknown(); } - return kubernetesPeonLifecycle.getTaskLocation(); + return FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).getTaskLocation(); } @Override @@ -119,4 +122,12 @@ public Task getTask() { return task; } + + /** + * Future that resolves when the work item's lifecycle has been initialized. + */ + public ListenableFuture getTaskLocationAsync() + { + return FutureUtils.transformAsync(kubernetesPeonLifecycle, KubernetesPeonLifecycle::getTaskLocationAsync); + } } diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java index 67a5278c6a32..be9d5ef091f7 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesTaskRunnerTest.java @@ -113,15 +113,18 @@ public void test_start_withExistingJobs() throws IOException ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { - return tasks.computeIfAbsent( + final KubernetesWorkItem workItem = tasks.computeIfAbsent( task.getId(), k -> new KubernetesWorkItem( task, Futures.immediateFuture(TaskStatus.success(task.getId())) ) - ).getResult(); + ); + + workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle); + return workItem; } }; @@ -133,6 +136,8 @@ protected ListenableFuture joinAsync(Task task) EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job)); EasyMock.expect(taskAdapter.toTask(job)).andReturn(task); + EasyMock.expect(kubernetesPeonLifecycle.getTaskLocationAsync()) + .andReturn(Futures.immediateFuture(TaskLocation.create("localhost", 1234, -1))); replayAll(); @@ -142,6 +147,8 @@ protected ListenableFuture joinAsync(Task task) Assert.assertNotNull(runner.tasks); Assert.assertEquals(1, runner.tasks.size()); + + runner.stop(); } @Test @@ -157,10 +164,9 @@ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOE ) { @Override - protected ListenableFuture joinAsync(Task task) + protected KubernetesWorkItem joinAsync(Task task) { - return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)) - .getResult(); + return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null)); } }; @@ -181,6 +187,8 @@ protected ListenableFuture joinAsync(Task task) Assert.assertNotNull(runner.tasks); Assert.assertEquals(0, runner.tasks.size()); + + runner.stop(); } @Test @@ -286,8 +294,8 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt replayAll(); - ListenableFuture future = runner.joinAsync(task); - Assert.assertEquals(taskStatus, future.get()); + KubernetesWorkItem workItem = runner.joinAsync(task); + Assert.assertEquals(taskStatus, workItem.getResult().get()); verifyAll(); } @@ -310,7 +318,8 @@ public void test_join_whenExceptionThrown_throwsRuntimeException() replayAll(); - ListenableFuture future = runner.joinAsync(task); + KubernetesWorkItem workItem = runner.joinAsync(task); + final ListenableFuture future = workItem.getResult(); Exception e = Assert.assertThrows(ExecutionException.class, future::get); Assert.assertTrue(e.getCause() instanceof RuntimeException); diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java index 7d17193b1714..c4fa1f74157a 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesWorkItemTest.java @@ -19,6 +19,7 @@ package org.apache.druid.k8s.overlord; +import org.apache.druid.error.DruidException; import org.apache.druid.indexer.RunnerTaskState; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexing.common.task.NoopTask; @@ -60,7 +61,7 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException() )); Assert.assertThrows( - IllegalStateException.class, + DruidException.class, () -> workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle( task, null,