Skip to content

Commit

Permalink
[Bug][Zeta] Fix TaskExecutionService will return not active Execution…
Browse files Browse the repository at this point in the history
…Context (#4869)
  • Loading branch information
Hisoka-X authored Jun 5, 2023
1 parent 2e9a1bd commit d0a47b9
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ public TaskGroupContext getExecutionContext(TaskGroupLocation taskGroupLocation)
return taskGroupContext;
}

public TaskGroupContext getActiveExecutionContext(TaskGroupLocation taskGroupLocation) {
TaskGroupContext taskGroupContext = executionContexts.get(taskGroupLocation);

if (taskGroupContext == null) {
throw new TaskGroupContextNotFoundException(
String.format("task group %s not found.", taskGroupLocation));
}
return taskGroupContext;
}

private void submitThreadShareTask(
TaskGroupExecutionTracker taskGroupExecutionTracker, List<Task> tasks) {
Stream<TaskTracker> taskTrackerStream =
Expand Down Expand Up @@ -231,7 +241,7 @@ public TaskDeployState deployTask(@NonNull Data taskImmutableInformation) {

public <T extends Task> T getTask(@NonNull TaskLocation taskLocation) {
TaskGroupContext executionContext =
this.getExecutionContext(taskLocation.getTaskGroupLocation());
this.getActiveExecutionContext(taskLocation.getTaskGroupLocation());
return executionContext.getTaskGroup().getTask(taskLocation.getTaskID());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,8 @@ protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
// TODO: clear related future & scheduler task
pendingCheckpoints.clear();
}
pipelineTaskStatus.clear();
readyToCloseStartingTask.clear();
pendingCounter.set(0);
scheduler.shutdownNow();
scheduler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,15 @@ public void receivedReader(TaskLocation readerId, Address memberAddr)
SourceSplitEnumerator<SplitT, Serializable> enumerator = getEnumerator();
this.addTaskMemberMapping(readerId, memberAddr);
enumerator.registerReader(readerId.getTaskIndex());
if (maxReaderSize == taskMemberMapping.size()) {
int taskSize = taskMemberMapping.size();
if (maxReaderSize == taskSize) {
readerRegisterComplete = true;
log.debug(String.format("reader register complete, current task size %d", taskSize));
} else {
log.debug(
String.format(
"current task size %d, need size %d to complete register",
taskSize, maxReaderSize));
}
}

Expand Down

0 comments on commit d0a47b9

Please sign in to comment.