Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubernetesTaskRunner: Wait in start() for tasks to be located. #17419

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/development/extensions-contrib/k8s-jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ Druid selects the pod template `podSpecWithHighMemRequests.yaml`.
|`druid.indexer.runner.maxTaskDuration`| `Duration` | Max time a task is allowed to run for before getting killed |`PT4H`|No|
|`druid.indexer.runner.taskCleanupDelay`| `Duration` | How long do jobs stay around before getting reaped from K8s |`P2D`|No|
|`druid.indexer.runner.taskCleanupInterval`| `Duration` | How often to check for jobs to be reaped |`PT10M`|No|
|`druid.indexer.runner.taskJoinTimeout`| `Duration` | Timeout for gathering metadata about existing tasks on startup |`PT1M`|No|
|`druid.indexer.runner.K8sjobLaunchTimeout`| `Duration` | How long to wait to launch a K8s task before marking it as failed, on a resource constrained cluster it may take some time. |`PT1H`|No|
|`druid.indexer.runner.javaOptsArray`| `JsonArray` | java opts for the task. |`-Xmx1g`|No|
|`druid.indexer.runner.labels`| `JsonObject` | Additional labels you want to add to peon pod |`{}`|No|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodStatus;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.task.Task;
Expand Down Expand Up @@ -94,7 +97,7 @@ protected enum State
@MonotonicNonNull
private LogWatch logWatch;

private TaskLocation taskLocation;
private final SettableFuture<TaskLocation> taskLocation = SettableFuture.create();

protected KubernetesPeonLifecycle(
Task task,
Expand Down Expand Up @@ -130,8 +133,6 @@ protected synchronized TaskStatus run(Job job, long launchTimeout, long timeout,
writeTaskPayload(task);
}

// In case something bad happens and run is called twice on this KubernetesPeonLifecycle, reset taskLocation.
taskLocation = null;
kubernetesClient.launchPeonJobAndWaitForStart(
job,
task,
Expand Down Expand Up @@ -183,8 +184,8 @@ protected synchronized TaskStatus join(long timeout) throws IllegalStateExceptio
since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher
if we decide we need to change this later.
**/
taskLocation = getTaskLocationFromK8s();
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should reverse the order of these two lines. technically if updateState -> RUNNING is not called the runner will still not know where the task is even if the getTaskLocationAsync future returns (b/c getTaskLocation does a check on state first). in any case we should always be marking a task as running immediately after we join it (i looked through all the references to state)

Copy link
Contributor Author

@gianm gianm Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched them. I'm not sure why the order was this way originally, but it does make more sense to me the way you're suggesting.

taskLocation.set(getTaskLocationFromK8s());

JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
taskId,
Expand Down Expand Up @@ -260,11 +261,19 @@ protected TaskLocation getTaskLocation()
since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher
if we decide we need to change this later.
**/
if (taskLocation == null) {
if (!taskLocation.isDone()) {
log.warn("Unknown task location for [%s]", taskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm a little unsure about this logic because i have seen instances where the k8s calls can temporarily fail. i am also not sure if fabric8 is actually retrying these calls.

see the initial trigger for (#17431, #17417)

i think i would feel better if we still fell back to querying k8s for the pod location in this block for now (returning a immediate future). i think another option would be to wrap getTaskLocationFromK8s in more retries, i can put up a separate pr if you don't want to do it in this one though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What change are you suggesting here? I can make it, but I'm not sure what you mean exactly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically i think for now we should add a check for the future having completed with a exception and retry it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're deciding to address this through #17417 instead. Most of what's happening in this PR is about making it so locations are always known by the time start() exits, whereas #17417 is about making that unnecessary by fetching locations live.

return TaskLocation.unknown();
}

return FutureUtils.getUncheckedImmediately(taskLocation);
}

/**
* Get a future that resolves to the task location, when available.
*/
public ListenableFuture<TaskLocation> getTaskLocationAsync()
{
return taskLocation;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -55,12 +56,14 @@
import org.apache.druid.k8s.overlord.taskadapter.TaskAdapter;
import org.apache.druid.tasklogs.TaskLogStreamer;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.DateTime;
import org.joda.time.Duration;

import javax.annotation.Nullable;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -151,11 +154,10 @@ public ListenableFuture<TaskStatus> run(Task task)
}
}

protected ListenableFuture<TaskStatus> joinAsync(Task task)
protected KubernetesWorkItem joinAsync(Task task)
{
synchronized (tasks) {
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, exec.submit(() -> joinTask(task))));
}
}

Expand Down Expand Up @@ -270,7 +272,6 @@ public void shutdown(String taskid, String reason)
synchronized (tasks) {
tasks.remove(taskid);
}

}

@Override
Expand Down Expand Up @@ -321,16 +322,41 @@ public List<Pair<Task, ListenableFuture<TaskStatus>>> restore()
public void start()
{
log.info("Starting K8sTaskRunner...");
// Load tasks from previously running jobs and wait for their statuses to be updated asynchronously.
for (Job job : client.getPeonJobs()) {

// Load tasks from previously running jobs and wait for their locations to be discovered.
final List<ListenableFuture<TaskLocation>> taskLocationFutures = new ArrayList<>();
final List<Job> peonJobs = client.getPeonJobs();

log.info("Locating [%,d] active tasks.", peonJobs.size());

for (final Job job : peonJobs) {
try {
joinAsync(adapter.toTask(job));
taskLocationFutures.add(joinAsync(adapter.toTask(job)).getTaskLocationAsync());
}
catch (IOException e) {
log.error(e, "Error deserializing task from job [%s]", job.getMetadata().getName());
}
}
log.info("Loaded %,d tasks from previous run", tasks.size());

try {
final DateTime nowUtc = DateTimes.nowUtc();
final long timeoutMs = nowUtc.plus(config.getTaskJoinTimeout()).getMillis() - nowUtc.getMillis();
if (timeoutMs > 0) {
FutureUtils.coalesce(taskLocationFutures).get(timeoutMs, TimeUnit.MILLISECONDS);
}
log.info("Located [%,d] active tasks.", taskLocationFutures.size());
}
catch (Exception e) {
final long numInitialized =
tasks.values().stream().filter(item -> item.getTaskLocationAsync().isDone()).count();
log.warn(
e,
"Located [%,d] out of [%,d] active tasks (timeout = %s). Locating others asynchronously.",
numInitialized,
taskLocationFutures.size(),
config.getTaskJoinTimeout()
);
}

cleanupExecutor.scheduleAtFixedRate(
() ->
Expand All @@ -350,6 +376,7 @@ public void start()
public void stop()
{
log.debug("Stopping KubernetesTaskRunner");
exec.shutdownNow();
cleanupExecutor.shutdownNow();
log.debug("Stopped KubernetesTaskRunner");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public class KubernetesTaskRunnerConfig
// interval for k8s job cleanup to run
private Period taskCleanupInterval = new Period("PT10m");

@JsonProperty
@NotNull
// how long to wait to join peon k8s jobs on startup
private Period taskJoinTimeout = new Period("PT1M");

@JsonProperty
@NotNull
// how long to wait for the peon k8s job to launch
Expand Down Expand Up @@ -248,6 +253,11 @@ public Period getTaskTimeout()
return maxTaskDuration;
}

public Period getTaskJoinTimeout()
{
return taskJoinTimeout;
}

public Period getTaskCleanupDelay()
{
return taskCleanupDelay;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.druid.k8s.overlord;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
Expand All @@ -34,7 +36,7 @@
public class KubernetesWorkItem extends TaskRunnerWorkItem
{
private final Task task;
private KubernetesPeonLifecycle kubernetesPeonLifecycle = null;
private final SettableFuture<KubernetesPeonLifecycle> kubernetesPeonLifecycle = SettableFuture.create();

public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)
{
Expand All @@ -44,16 +46,17 @@ public KubernetesWorkItem(Task task, ListenableFuture<TaskStatus> statusFuture)

protected synchronized void setKubernetesPeonLifecycle(KubernetesPeonLifecycle kubernetesPeonLifecycle)
{
Preconditions.checkState(this.kubernetesPeonLifecycle == null);
this.kubernetesPeonLifecycle = kubernetesPeonLifecycle;
if (!this.kubernetesPeonLifecycle.set(kubernetesPeonLifecycle)) {
throw DruidException.defensive("Unexpected call to setKubernetesPeonLifecycle, can only call this once.");
}
}

protected synchronized void shutdown()
{

if (this.kubernetesPeonLifecycle != null) {
this.kubernetesPeonLifecycle.startWatchingLogs();
this.kubernetesPeonLifecycle.shutdown();
if (kubernetesPeonLifecycle.isDone()) {
final KubernetesPeonLifecycle lifecycle = FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle);
lifecycle.startWatchingLogs();
lifecycle.shutdown();
}
}

Expand All @@ -69,11 +72,11 @@ protected boolean isRunning()

protected RunnerTaskState getRunnerTaskState()
{
if (kubernetesPeonLifecycle == null) {
if (!kubernetesPeonLifecycle.isDone()) {
return RunnerTaskState.PENDING;
}

switch (kubernetesPeonLifecycle.getState()) {
switch (FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).getState()) {
case NOT_STARTED:
case PENDING:
return RunnerTaskState.PENDING;
Expand All @@ -88,19 +91,19 @@ protected RunnerTaskState getRunnerTaskState()

protected Optional<InputStream> streamTaskLogs()
{
if (kubernetesPeonLifecycle == null) {
if (!kubernetesPeonLifecycle.isDone()) {
return Optional.absent();
}
return kubernetesPeonLifecycle.streamLogs();
return FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).streamLogs();
}

@Override
public TaskLocation getLocation()
{
if (kubernetesPeonLifecycle == null) {
if (!kubernetesPeonLifecycle.isDone()) {
return TaskLocation.unknown();
}
return kubernetesPeonLifecycle.getTaskLocation();
return FutureUtils.getUncheckedImmediately(kubernetesPeonLifecycle).getTaskLocation();
}

@Override
Expand All @@ -119,4 +122,12 @@ public Task getTask()
{
return task;
}

/**
* Future that resolves when the work item's lifecycle has been initialized.
*/
public ListenableFuture<TaskLocation> getTaskLocationAsync()
{
return FutureUtils.transformAsync(kubernetesPeonLifecycle, KubernetesPeonLifecycle::getTaskLocationAsync);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,18 @@ public void test_start_withExistingJobs() throws IOException
)
{
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
protected KubernetesWorkItem joinAsync(Task task)
{
return tasks.computeIfAbsent(
final KubernetesWorkItem workItem = tasks.computeIfAbsent(
task.getId(),
k -> new KubernetesWorkItem(
task,
Futures.immediateFuture(TaskStatus.success(task.getId()))
)
).getResult();
);

workItem.setKubernetesPeonLifecycle(kubernetesPeonLifecycle);
return workItem;
}
};

Expand All @@ -133,6 +136,8 @@ protected ListenableFuture<TaskStatus> joinAsync(Task task)

EasyMock.expect(peonClient.getPeonJobs()).andReturn(ImmutableList.of(job));
EasyMock.expect(taskAdapter.toTask(job)).andReturn(task);
EasyMock.expect(kubernetesPeonLifecycle.getTaskLocationAsync())
.andReturn(Futures.immediateFuture(TaskLocation.create("localhost", 1234, -1)));

replayAll();

Expand All @@ -142,6 +147,8 @@ protected ListenableFuture<TaskStatus> joinAsync(Task task)

Assert.assertNotNull(runner.tasks);
Assert.assertEquals(1, runner.tasks.size());

runner.stop();
}

@Test
Expand All @@ -157,10 +164,9 @@ public void test_start_whenDeserializationExceptionThrown_isIgnored() throws IOE
)
{
@Override
protected ListenableFuture<TaskStatus> joinAsync(Task task)
protected KubernetesWorkItem joinAsync(Task task)
{
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null))
.getResult();
return tasks.computeIfAbsent(task.getId(), k -> new KubernetesWorkItem(task, null));
}
};

Expand All @@ -181,6 +187,8 @@ protected ListenableFuture<TaskStatus> joinAsync(Task task)

Assert.assertNotNull(runner.tasks);
Assert.assertEquals(0, runner.tasks.size());

runner.stop();
}

@Test
Expand Down Expand Up @@ -286,8 +294,8 @@ public void test_join_withoutExistingTask() throws ExecutionException, Interrupt

replayAll();

ListenableFuture<TaskStatus> future = runner.joinAsync(task);
Assert.assertEquals(taskStatus, future.get());
KubernetesWorkItem workItem = runner.joinAsync(task);
Assert.assertEquals(taskStatus, workItem.getResult().get());

verifyAll();
}
Expand All @@ -310,7 +318,8 @@ public void test_join_whenExceptionThrown_throwsRuntimeException()

replayAll();

ListenableFuture<TaskStatus> future = runner.joinAsync(task);
KubernetesWorkItem workItem = runner.joinAsync(task);
final ListenableFuture<TaskStatus> future = workItem.getResult();

Exception e = Assert.assertThrows(ExecutionException.class, future::get);
Assert.assertTrue(e.getCause() instanceof RuntimeException);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.k8s.overlord;

import org.apache.druid.error.DruidException;
import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexing.common.task.NoopTask;
Expand Down Expand Up @@ -60,7 +61,7 @@ public void test_setKubernetesPeonLifecycleTwice_throwsIllegalStateException()
));

Assert.assertThrows(
IllegalStateException.class,
DruidException.class,
() -> workItem.setKubernetesPeonLifecycle(new KubernetesPeonLifecycle(
task,
null,
Expand Down
Loading