-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8972 (2.4 blocker): bug fix for restoring task #7617
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yikes. Way to catch this!
Great catch! We should consider how this got all the way to our soak cluster before we caught it. Seems like we are lacking unit test coverage for this code. |
@apurvam Totally agree, this is obviously a unit-test coverable bug. Let me check and add some tests. |
The fix LGTM. Waiting for the added unit test before merging. |
@Override | ||
public void additionalSetup(final StreamTask task) { | ||
EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes(); | ||
task.closeStateManager(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm this is interesting: for created the running tasks we call its task.close
function whereas for restoring
we only call task.closeStateManager
, is it intentional? If yes why? cc @ableegoldman
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked you exactly that a few months ago :) You referenced some old PR but basically the takeaway was, a restoring task hasn't initialized anything but its state, therefore needs to close only the state manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a little awkward, @abbccdda maybe we should at least put in a comment explaining this for now so we don't have to keep asking each other
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ultimately I've been thinking all these different task types should literally be different types, so that we can just call task.close
and have it close whatever is needed for a restoring task, or suspended, etc. We could probably do some further cleanup, for example on close all tasks call suspend
then closeSuspended
but only active & running tasks are ever just suspended -- and we could probably also drop the double checkpointing (see KAFKA-9113, and please feel free to dump any of your own thoughts around safer task management on there)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I agree, but my confusion is that why we call close on created tasks? Basically the weird thing I noticed is that for closing as zombies:
- created : call
close
- restoring : call
closeStateMgr
- running : call
close
- suspended : call
closeSuspended
3 and 4 makes sense to me, but 1/2 are a bit weird: for created tasks, we do not initialize topology nor state managers, but still we call close
, whereas restoring ones (where we initialized state manager but not topology) we call closeStateMgr
.
Did a closer look at the code base, I think the reason is that in closeNonRunningTasks
(which is triggered in onPartitionsRevoked
) we actually trigger the following:
task.close(false /* clean */, false /* isZombie */)
I.e. we treat it as an "unclean" close, and hence we won't write checkpoint, and most importantly, when closeTopology
throws (which would be the case since it is not initialized at all) we would just ignore it. So "accidentally" this becomes correct.
This is not a great pattern, and I like @ableegoldman 's suggestion that we should consider making "close" call to be more robust in a follow-up PR, for now let's stay with what it is then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's good to reveal these error-prone close call setups here. My take is that each function starting with close
should be distinguished clearly in function comment what it tries to do. And we should avoid including > 1 boolean variables, as it's hard to reason about. What about cleanClose() and uncleanClose()? Also should we be more specific about word zombie
? This seems to be a really vague term too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think (and hope) the term zombie
is not used in an unclear way -- but there might be some confusion in the translation from "clients-speak" to "streams-speak" since what the clients simply call "lost" we call "zombie task". Some other examples along that line are the confusion about what revoked means -- sometimes its the partitions given up temporarily, sometimes only the partitions that are being reassigned and permanently given up.
But clear code can always help with unclear terminology: as an example of some progress, we used to have both "suspended" and "zombie" standbys, neither of which really makes sense for a standby. We cleaned up that code in some recent PRs so that, I feel, is now quite clear
@abbccdda I also like the idea of cleanClose
, unCleanClose
, where each specific task can make it clear what it needs to do in each case. A nice side-effect there is we can clearly comment what clean (or unclean) means at the top of each method, rather than try to interpret every brief parameter description
Should we file an issue to track sophie's specific improvement. The issue
she linked to seems more general than a refactor.
…On Thu, Oct 31, 2019 at 4:23 PM Guozhang Wang ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasksTest.java
<#7617 (comment)>:
> + }
+
+ @OverRide
+ public List<TopicPartition> expectedLostChangelogs() {
+ return clearingPartitions;
+ }
+ }.createTaskAndClear();
+ }
+
+ @test
+ public void shouldClearZombieRestoringTasks() {
+ new TaskTestSuite() {
+ @OverRide
+ public void additionalSetup(final StreamTask task) {
+ EasyMock.expect(task.partitions()).andReturn(Collections.emptySet()).anyTimes();
+ task.closeStateManager(false);
I think I agree, but my confusion is that why we call close on created
tasks? Basically the weird thing I noticed is that for closing as zombies:
1. created : call close
2. restoring : call closeStateMgr
3. running : call close
4. suspended : call closeSuspended
3 and 4 makes sense to me, but 1/2 are a bit weird: for created tasks, we
do not initialize topology nor state managers, but still we call close,
whereas restoring ones (where we initialized state manager but not
topology) we call closeStateMgr.
Did a closer look at the code base, I think the reason is that in
closeNonRunningTasks (which is triggered in onPartitionsRevoked) we
actually trigger the following:
task.close(false /* clean */, false /* isZombie */)
I.e. we treat it as an "unclean" close, and hence we won't write
checkpoint, and most importantly, when closeTopology throws (which would
be the case since it is not initialized at all) we would just ignore it. So
"accidentally" this becomes correct.
This is not a great pattern, and I like @ableegoldman
<https://github.com/ableegoldman> 's suggestion that we should consider
making "close" call to be more robust in a follow-up PR, for now let's stay
with what it is then.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#7617?email_source=notifications&email_token=AAGKWDD3DHXYH4PHEVCPLX3QRNSGBA5CNFSM4JHCOAY2YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOCJ637LQ#discussion_r341403757>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAGKWDFMDLPNZEJZKEDZISLQRNSGBANCNFSM4JHCOAYQ>
.
|
Left our thoughts on https://issues.apache.org/jira/browse/KAFKA-9113?focusedCommentId=16964468&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16964468 I'm going to merge this PR to trunk and 2.4 now. |
This is a typo bug which is due to calling a wrong map. Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, Guozhang Wang <guozhang@confluent.io>
Fixed a minor conflict in `.gitignore` and fix compiler errors in KafkaUtilities due to `PartitionReplicaAssignment` rename to `ReplicaAssignment`. * apache-github/trunk: (34 commits) HOTFIX: Try to complete Send even if no bytes were written (apache#7622) KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets KAFKA-8972 (2.4 blocker): TaskManager state should always be updated after rebalance (apache#7620) MINOR: Fix Kafka Streams JavaDocs with regard to new StreamJoined class (apache#7627) MINOR: Fix sensor retrieval in stand0by task's constructor (apache#7632) MINOR: Replace some Java 7 style code with Java 8 style (apache#7623) KAFKA-8868: Generate SubscriptionInfo protocol message (apache#7248) MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric KAFKA-8972 (2.4 blocker): bug fix for restoring task (apache#7617) KAFKA-9093: NullPointerException in KafkaConsumer with group.instance.id (apache#7590) KAFKA-8980: Refactor state-store-level streams metrics (apache#7584) MINOR: Fix documentation for updateCurrentReassignment (apache#7611) MINOR: Preserve backwards-compatibility by renaming the AlterPartitionReassignment metric to PartitionReassignment KAFKA-8972 (2.4 blocker): clear all state for zombie task on TaskMigratedException (apache#7608) KAFKA-9077: Fix reading of metrics of Streams' SimpleBenchmark (apache#7610) KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (apache#7441) MINOR: improve logging of tasks on shutdown (apache#7597) KAFKA-9048 Pt1: Remove Unnecessary lookup in Fetch Building (apache#7576) MINOR: Fix command examples in kafka-reassign-partitions.sh docs (apache#7583) KAFKA-9102; Increase default zk session timeout and replica max lag [KIP-537] (apache#7596) ...
This is a typo bug which is due to calling a wrong map.
Committer Checklist (excluded from commit message)