Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2011] Fix bug where another host could report the job as skipped, then the … #3888

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,20 @@ public List<FlowStatus> getFlowStatusesAcrossGroup(String flowGroup, int countPe
* @return true, if any jobs of the flow are RUNNING.
*/
public boolean isFlowRunning(String flowName, String flowGroup, long flowExecutionId) {
List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 1, null);
List<FlowStatus> flowStatusList = getLatestFlowStatus(flowName, flowGroup, 2, null);
if (flowStatusList == null || flowStatusList.isEmpty()) {
return false;
}
FlowStatus flowStatus = flowStatusList.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you make a comment to make clear that the first one is the most recent and may or may not match the pending flowExecutionId attempt and the second index is an older one? It's a bit hard to keep track of what's going on here without reading ur commit desc

ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
log.info("Comparing flow execution status with flowExecutionId: " + flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + " with incoming flowExecutionId: " + flowExecutionId);
// If the latest flow status is the current job about to get kicked off, we should ignore this check
if (flowStatus.getFlowExecutionId() == flowExecutionId) {
// Another host may have already emitted a flow status that skipped this flow execution, so compare against the previous flow status
FlowStatus previousFlowStatus = flowStatusList.size() > 1 ? flowStatusList.get(1) : null;
return previousFlowStatus != null && FINISHED_STATUSES.contains(previousFlowStatus.getFlowExecutionStatus().name());
} else {
FlowStatus flowStatus = flowStatusList.get(0);
ExecutionStatus flowExecutionStatus = flowStatus.getFlowExecutionStatus();
log.info("Comparing flow execution status with flowExecutionId: " + flowStatus.getFlowExecutionId() + " and flowStatus: " + flowExecutionStatus + " with incoming flowExecutionId: " + flowExecutionId);
// If the latest flow status is the current job about to get kicked off, we should ignore this check
return flowStatus.getFlowExecutionId() != flowExecutionId && !FINISHED_STATUSES.contains(flowExecutionStatus.name());
return !FINISHED_STATUSES.contains(flowExecutionStatus.name());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.anyInt;


public class FlowStatusGeneratorTest {
Expand All @@ -43,7 +45,7 @@ public void testIsFlowRunningFirstExecution() {
String flowName = "testName";
String flowGroup = "testGroup";
long currFlowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(null);
when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), anyInt())).thenReturn(null);

FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, currFlowExecutionId));
Expand All @@ -55,7 +57,7 @@ public void testIsFlowRunningCompiledPastExecution() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), Mockito.anyInt())).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Expand All @@ -73,25 +75,49 @@ public void skipFlowConcurrentCheckSameFlowExecutionId() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), Mockito.anyInt())).thenReturn(
Lists.newArrayList(flowExecutionId));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.COMPILED.name()).build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// If the flow is compiled but the flow execution status is the same as the one about to be kicked off, do not consider it as running.
// If the flow is compiled but the flow execution id is the same as the one about to be kicked off, do not consider it as running.
Assert.assertFalse(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId));
}

@Test
@Test
public void skipCurrentFlowBasedOnPriorFlowExecutionId() {
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
long flowExecutionId2 = 1235L;
when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), Mockito.anyInt())).thenReturn(
Lists.newArrayList(flowExecutionId, flowExecutionId2));
JobStatus jobStatus = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.RUNNING.name()).build();
JobStatus jobStatus2 = JobStatus.builder().flowGroup(flowGroup).flowName(flowName).flowExecutionId(flowExecutionId2)
.jobName(JobStatusRetriever.NA_KEY).jobGroup(JobStatusRetriever.NA_KEY).eventName(ExecutionStatus.FAILED.name()).build();
Iterator<JobStatus> jobStatusIterator = Lists.newArrayList(jobStatus).iterator();
Iterator<JobStatus> jobStatusIterator2 = Lists.newArrayList(jobStatus2).iterator();
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId)).thenReturn(
jobStatusIterator);
when(jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId2)).thenReturn(
jobStatusIterator2);
FlowStatusGenerator flowStatusGenerator = new FlowStatusGenerator(jobStatusRetriever);
// If prior flow is still running but another host failed the current flow to skip it, use the prior flow execution ID as reference
Assert.assertTrue(flowStatusGenerator.isFlowRunning(flowName, flowGroup, flowExecutionId2));
}

@Test
public void testIsFlowRunningJobExecutionIgnored() {
String flowName = "testName";
String flowGroup = "testGroup";
long flowExecutionId = 1234L;
JobStatusRetriever jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
when(jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 1)).thenReturn(
when(jobStatusRetriever.getLatestExecutionIdsForFlow(eq(flowName), eq(flowGroup), Mockito.anyInt())).thenReturn(
Lists.newArrayList(flowExecutionId));
//JobStatuses should be ignored, only the flow level status matters.
String job1 = "job1";
Expand Down
Loading