Skip to content

Commit

Permalink
KAFKA-8972 (2.4 blocker): TaskManager state should always be updated …
Browse files Browse the repository at this point in the history
…after rebalance (#7620)

Currently when we identify version probing we return early from onAssignment and never get to updating the TaskManager and general state with the new assignment. Since we do actually give out "real" assignments even during version probing, a StreamThread should take real ownership of its tasks/partitions including cleaning them up in onPartitionsRevoked which gets invoked when we call onLeavePrepare as part of triggering the follow-up rebalance.

Every member will always get an assignment encoded with the lowest common version, so there should be no problem decoding a VP assignment. We should just allow onAssignment to proceed as usual so that the TaskManager is in a consistent state, and knows what all its tasks/partitions are when the first rebalance completes and the next one is triggered.

Reviewers: Boyang Chen <boyang@confluent.io>, Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>
  • Loading branch information
ableegoldman authored and guozhangwang committed Nov 1, 2019
1 parent 2421a69 commit d61b0c1
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,12 @@ protected void onJoinPrepare(int generation, String memberId) {

@Override
public void onLeavePrepare() {
// Save the current Generation and use that to get the memberId, as the hb thread can change it at any time
final Generation currentGeneration = generation();
final String memberId = currentGeneration.memberId;

log.debug("Executing onLeavePrepare with generation {} and memberId {}", currentGeneration, memberId);

// we should reset assignment and trigger the callback before leaving group
Set<TopicPartition> droppedPartitions = new HashSet<>(subscriptions.assignedPartitions());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class AssignedStandbyTasks extends AssignedTasks<StandbyTask> {
@Override
public void shutdown(final boolean clean) {
final String shutdownType = clean ? "Clean" : "Unclean";
log.debug(shutdownType + " shutdown of all standby tasks" + "\n" +
log.debug("{} shutdown of all standby tasks" + "\n" +
"non-initialized standby tasks to close: {}" + "\n" +
"running standby tasks to close: {}",
clean, created.keySet(), running.keySet());
shutdownType, created.keySet(), running.keySet());
super.shutdown(clean);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,12 +494,12 @@ void clear() {
@Override
public void shutdown(final boolean clean) {
final String shutdownType = clean ? "Clean" : "Unclean";
log.debug(shutdownType + " shutdown of all active tasks" + "\n" +
log.debug("{} shutdown of all active tasks" + "\n" +
"non-initialized stream tasks to close: {}" + "\n" +
"restoring tasks to close: {}" + "\n" +
"running stream tasks to close: {}" + "\n" +
"suspended stream tasks to close: {}",
clean, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet());
shutdownType, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet());
super.shutdown(clean);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,6 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat
// Check if this was a version probing rebalance and check the error code to trigger another rebalance if so
if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) {
setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
return;
}

// version 1 field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,7 @@ public void onPartitionsAssigned(final Collection<TopicPartition> assignedPartit
if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) {
log.debug(
"Skipping task creation in rebalance because we are already in {} state.",
streamThread.state()
);
} else if (streamThread.getAssignmentErrorCode() != AssignorError.NONE.code()) {
log.debug(
"Encountered assignment error during partition assignment: {}. Skipping task initialization and "
+ "pausing any partitions we may have been assigned.",
streamThread.getAssignmentErrorCode()
);
taskManager.pausePartitions();
streamThread.state());
} else {
// Close non-reassigned tasks before initializing new ones as we may have suspended active
// tasks that become standbys or vice versa
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,21 +467,22 @@ public void setAssignmentMetadata(final Map<TaskId, Set<TopicPartition>> activeT
}

log.debug("Assigning metadata with: " +
"\tactiveTasks: {},\n" +
"\tstandbyTasks: {}\n" +
"The updated active task states are: \n" +
"\tpreviousAssignedActiveTasks: {},\n" +
"\tpreviousAssignedStandbyTasks: {}\n" +
"The updated task states are: \n" +
"\tassignedActiveTasks {},\n" +
"\tassignedStandbyTasks {},\n" +
"\taddedActiveTasks {},\n" +
"\taddedStandbyTasks {},\n" +
"\trevokedActiveTasks {},\n" +
"\trevokedStandbyTasks {}",
activeTasks, standbyTasks,
assignedActiveTasks, assignedStandbyTasks,
activeTasks, standbyTasks,
addedActiveTasks, addedStandbyTasks,
revokedActiveTasks, revokedStandbyTasks);
this.assignedActiveTasks = activeTasks;
this.assignedStandbyTasks = standbyTasks;

assignedActiveTasks = activeTasks;
assignedStandbyTasks = standbyTasks;
}

public void updateSubscriptionsFromAssignment(final List<TopicPartition> partitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment,
if (maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) {
setAssignmentErrorCode(AssignorError.VERSION_PROBING.code());
usedSubscriptionMetadataVersionPeek.set(usedSubscriptionMetadataVersion);
return;
}

final List<TopicPartition> partitions = new ArrayList<>(assignment.partitions());
Expand Down

0 comments on commit d61b0c1

Please sign in to comment.