Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8972 (2.4 blocker): bug fix for restoring task #7617

Merged
merged 3 commits into from
Nov 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ RuntimeException closeZombieTasks(final Set<TaskId> lostTasks, final List<TopicP
firstException.compareAndSet(null, closeNonRunning(true, created.get(id), lostTaskChangelogs));
} else if (restoring.containsKey(id)) {
log.debug("Closing the zombie restoring stream task {}.", id);
firstException.compareAndSet(null, closeRestoring(true, created.get(id), lostTaskChangelogs));
firstException.compareAndSet(null, closeRestoring(true, restoring.get(id), lostTaskChangelogs));
} else if (running.containsKey(id)) {
log.debug("Closing the zombie running stream task {}.", id);
firstException.compareAndSet(null, closeRunning(true, running.get(id), lostTaskChangelogs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.function.ThrowingRunnable;
Expand All @@ -49,6 +50,7 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -517,6 +519,147 @@ public void shouldCloseCleanlyWithSuspendedTaskAndEOS() {
assignedTasks.shutdown(true);
}

@Test
public void shouldClearZombieCreatedTasks() {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
task.close(false, true);
}

@Override
public void action(final StreamTask task) {
assignedTasks.addNewTask(task);
}

@Override
public Set<TaskId> taskIds() {
return assignedTasks.created.keySet();
}

@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}

@Test
public void shouldClearZombieRestoringTasks() {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.closeStateManager(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm this is interesting: for created the running tasks we call its task.close function whereas for restoring we only call task.closeStateManager, is it intentional? If yes why? cc @ableegoldman

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked you exactly that a few months ago :) You referenced some old PR but basically the takeaway was, a restoring task hasn't initialized anything but its state, therefore needs to close only the state manager

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little awkward, @abbccdda maybe we should at least put in a comment explaining this for now so we don't have to keep asking each other

Copy link
Contributor

@ableegoldman ableegoldman Oct 31, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimately I've been thinking all these different task types should literally be different types, so that we can just call task.close and have it close whatever is needed for a restoring task, or suspended, etc. We could probably do some further cleanup, for example on close all tasks call suspend then closeSuspended but only active & running tasks are ever just suspended -- and we could probably also drop the double checkpointing (see KAFKA-9113, and please feel free to dump any of your own thoughts around safer task management on there)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree, but my confusion is that why we call close on created tasks? Basically the weird thing I noticed is that for closing as zombies:

  1. created : call close
  2. restoring : call closeStateMgr
  3. running : call close
  4. suspended : call closeSuspended

3 and 4 makes sense to me, but 1/2 are a bit weird: for created tasks, we do not initialize topology nor state managers, but still we call close, whereas restoring ones (where we initialized state manager but not topology) we call closeStateMgr.

Did a closer look at the code base, I think the reason is that in closeNonRunningTasks (which is triggered in onPartitionsRevoked) we actually trigger the following:

task.close(false /* clean */, false /* isZombie */)

I.e. we treat it as an "unclean" close, and hence we won't write checkpoint, and most importantly, when closeTopology throws (which would be the case since it is not initialized at all) we would just ignore it. So "accidentally" this becomes correct.

This is not a great pattern, and I like @ableegoldman 's suggestion that we should consider making "close" call to be more robust in a follow-up PR, for now let's stay with what it is then.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's good to reveal these error-prone close call setups here. My take is that each function starting with close should be distinguished clearly in function comment what it tries to do. And we should avoid including > 1 boolean variables, as it's hard to reason about. What about cleanClose() and uncleanClose()? Also should we be more specific about word zombie? This seems to be a really vague term too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think (and hope) the term zombie is not used in an unclear way -- but there might be some confusion in the translation from "clients-speak" to "streams-speak" since what the clients simply call "lost" we call "zombie task". Some other examples along that line are the confusion about what revoked means -- sometimes its the partitions given up temporarily, sometimes only the partitions that are being reassigned and permanently given up.

But clear code can always help with unclear terminology: as an example of some progress, we used to have both "suspended" and "zombie" standbys, neither of which really makes sense for a standby. We cleaned up that code in some recent PRs so that, I feel, is now quite clear

@abbccdda I also like the idea of cleanClose, unCleanClose, where each specific task can make it clear what it needs to do in each case. A nice side-effect there is we can clearly comment what clean (or unclean) means at the top of each method, rather than try to interpret every brief parameter description

}

@Override
public void action(final StreamTask task) {
assignedTasks.addTaskToRestoring(task);
}

@Override
public Set<TaskId> taskIds() {
return assignedTasks.restoringTaskIds();
}

@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}

@Test
public void shouldClearZombieRunningTasks() {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
task.initializeTopology();
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.close(false, true);
}

@Override
public void action(final StreamTask task) {
assignedTasks.transitionToRunning(task);
}

@Override
public Set<TaskId> taskIds() {
return assignedTasks.runningTaskIds();
}

@Override
public List<TopicPartition> expectedLostChangelogs() {
return clearingPartitions;
}
}.createTaskAndClear();
}

@Test
public void shouldClearZombieSuspendedTasks() {
new TaskTestSuite() {
@Override
public void additionalSetup(final StreamTask task) {
task.initializeTopology();
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
task.suspend();
task.closeSuspended(false, null);
}

@Override
public void action(final StreamTask task) {
assignedTasks.transitionToRunning(task);
final List<TopicPartition> revokedChangelogs = new ArrayList<>();
final List<TaskId> ids = Collections.singletonList(task.id());
assignedTasks.suspendOrCloseTasks(new HashSet<>(ids), revokedChangelogs);
assertEquals(clearingPartitions, revokedChangelogs);
}

@Override
public Set<TaskId> taskIds() {
return assignedTasks.suspendedTaskIds();
}

@Override
public List<TopicPartition> expectedLostChangelogs() {
return Collections.emptyList();
}
}.createTaskAndClear();
}

abstract class TaskTestSuite {

TaskId clearingTaskId = new TaskId(0, 0);
List<TopicPartition> clearingPartitions = Collections.singletonList(new TopicPartition("topic", 0));

abstract void additionalSetup(final StreamTask task);

abstract void action(final StreamTask task);

abstract Set<TaskId> taskIds();

abstract List<TopicPartition> expectedLostChangelogs();

void createTaskAndClear() {
final StreamTask task = EasyMock.createMock(StreamTask.class);
EasyMock.expect(task.id()).andReturn(clearingTaskId).anyTimes();
EasyMock.expect(task.changelogPartitions()).andReturn(clearingPartitions).anyTimes();
additionalSetup(task);
EasyMock.replay(task);

action(task);
final List<TopicPartition> changelogs = new ArrayList<>();
final Set<TaskId> ids = new HashSet<>(Collections.singleton(task.id()));
assertEquals(ids, taskIds());

assignedTasks.closeZombieTasks(ids, changelogs);
assertEquals(Collections.emptySet(), taskIds());
assertEquals(expectedLostChangelogs(), changelogs);
}
}

private void addAndInitTask() {
assignedTasks.addNewTask(t1);
assignedTasks.initializeNewTasks();
Expand Down