Skip to content

Commit

Permalink
Load getTaskLocation on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
georgew5656 committed Oct 15, 2024
1 parent c2149d5 commit 7aa2a61
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,24 @@ private void writeTaskPayload(Task task) throws IOException
protected synchronized TaskStatus join(long timeout) throws IllegalStateException
{
try {
Pod pod = kubernetesClient.getPeonPodWithRetries(taskId.getK8sJobName());
PodStatus podStatus = pod.getStatus();

if (podStatus != null && podStatus.getPodIP() != null) {
/* It's okay to cache this because podIP only changes on pod restart, and we have to set restartPolicy to Never
since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher
if we decide we need to change this later.
**/
taskLocation = TaskLocation.create(
podStatus.getPodIP(),
DruidK8sConstants.PORT,
DruidK8sConstants.TLS_PORT,
Boolean.parseBoolean(pod.getMetadata().getAnnotations().getOrDefault(DruidK8sConstants.TLS_ENABLED, "false")),
pod.getMetadata() != null ? pod.getMetadata().getName() : ""
);
} else {
log.warn("Could not get task location for [%s]", taskId);
}
updateState(new State[]{State.NOT_STARTED, State.PENDING}, State.RUNNING);

JobResponse jobResponse = kubernetesClient.waitForPeonJobCompletion(
Expand Down Expand Up @@ -249,11 +267,9 @@ protected TaskLocation getTaskLocation()
return TaskLocation.unknown();
}

/* It's okay to cache this because podIP only changes on pod restart, and we have to set restartPolicy to Never
since Druid doesn't support retrying tasks from a external system (K8s). We can explore adding a fabric8 watcher
if we decide we need to change this later.
**/
// Technically this should never really be true, but leaving it in for now.
if (taskLocation == null) {
log.warn("getTaskLocation [%s] called without location being cached. ", taskId);
Optional<Pod> maybePod = kubernetesClient.getPeonPod(taskId.getK8sJobName());
if (!maybePod.isPresent()) {
return TaskLocation.unknown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ public void test_join_withoutJob_returnsFailedTaskStatus() throws IOException
stateListener.stateChanged(KubernetesPeonLifecycle.State.STOPPED, ID);
EasyMock.expectLastCall().once();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatus().addNewPodIP("ip").endStatus().build()
);
replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -337,7 +340,9 @@ public void test_join() throws IOException
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatus().addNewPodIP("ip").endStatus().build()
);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -393,7 +398,9 @@ public void test_join_whenCalledMultipleTimes_raisesIllegalStateException() thro
EasyMock.expectLastCall();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(EasyMock.anyString())).andReturn(
new PodBuilder().editOrNewStatus().addNewPodIP("ip").endStatus().build()
).anyTimes();
replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -445,7 +452,9 @@ public void test_join_withoutTaskStatus_returnsFailedTaskStatus() throws IOExcep
EasyMock.expectLastCall().once();

Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatus().addNewPodIP("ip").endStatus().build()
);
replayAll();

TaskStatus taskStatus = peonLifecycle.join(0L);
Expand Down Expand Up @@ -493,7 +502,9 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskStatus_returnsFaile
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatus().addNewPodIP("ip").endStatus().build()
);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -545,7 +556,9 @@ public void test_join_whenIOExceptionThrownWhileStreamingTaskLogs_isIgnored() th
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatus().addNewPodIP("ip").endStatus().build()
);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down Expand Up @@ -585,7 +598,9 @@ public void test_join_whenRuntimeExceptionThrownWhileWaitingForKubernetesJob_thr
EasyMock.expectLastCall().once();
logWatch.close();
EasyMock.expectLastCall();

EasyMock.expect(kubernetesClient.getPeonPodWithRetries(k8sTaskId.getK8sJobName())).andReturn(
new PodBuilder().editOrNewStatus().addNewPodIP("ip").endStatus().build()
);
Assert.assertEquals(KubernetesPeonLifecycle.State.NOT_STARTED, peonLifecycle.getState());

replayAll();
Expand Down

0 comments on commit 7aa2a61

Please sign in to comment.