diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index d5b306167c94..61bd48a624a7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -692,6 +692,12 @@ protected void onJoinPrepare(int generation, String memberId) { @Override public void onLeavePrepare() { + // Save the current Generation and use that to get the memberId, as the hb thread can change it at any time + final Generation currentGeneration = generation(); + final String memberId = currentGeneration.memberId; + + log.debug("Executing onLeavePrepare with generation {} and memberId {}", currentGeneration, memberId); + // we should reset assignment and trigger the callback before leaving group Set droppedPartitions = new HashSet<>(subscriptions.assignedPartitions()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java index f217a555338b..0f8896eea797 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStandbyTasks.java @@ -33,10 +33,10 @@ class AssignedStandbyTasks extends AssignedTasks { @Override public void shutdown(final boolean clean) { final String shutdownType = clean ? "Clean" : "Unclean"; - log.debug(shutdownType + " shutdown of all standby tasks" + "\n" + + log.debug("{} shutdown of all standby tasks" + "\n" + "non-initialized standby tasks to close: {}" + "\n" + "running standby tasks to close: {}", - clean, created.keySet(), running.keySet()); + shutdownType, created.keySet(), running.keySet()); super.shutdown(clean); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index 161714e34cf7..1400d5a13cc8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -494,12 +494,12 @@ void clear() { @Override public void shutdown(final boolean clean) { final String shutdownType = clean ? "Clean" : "Unclean"; - log.debug(shutdownType + " shutdown of all active tasks" + "\n" + + log.debug("{} shutdown of all active tasks" + "\n" + "non-initialized stream tasks to close: {}" + "\n" + "restoring tasks to close: {}" + "\n" + "running stream tasks to close: {}" + "\n" + "suspended stream tasks to close: {}", - clean, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet()); + shutdownType, created.keySet(), restoring.keySet(), running.keySet(), suspended.keySet()); super.shutdown(clean); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 78ee40c42234..38a150b70e41 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -1109,7 +1109,6 @@ public void onAssignment(final Assignment assignment, final ConsumerGroupMetadat // Check if this was a version probing rebalance and check the error code to trigger another rebalance if so if (maybeUpdateSubscriptionVersion(receivedAssignmentMetadataVersion, latestCommonlySupportedVersion)) { setAssignmentErrorCode(AssignorError.VERSION_PROBING.code()); - return; } // version 1 field diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java index f2c75b203c02..a4f1f6a4a900 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.java @@ -68,15 +68,7 @@ public void onPartitionsAssigned(final Collection assignedPartit if (streamThread.setState(State.PARTITIONS_ASSIGNED) == null) { log.debug( "Skipping task creation in rebalance because we are already in {} state.", - streamThread.state() - ); - } else if (streamThread.getAssignmentErrorCode() != AssignorError.NONE.code()) { - log.debug( - "Encountered assignment error during partition assignment: {}. Skipping task initialization and " - + "pausing any partitions we may have been assigned.", - streamThread.getAssignmentErrorCode() - ); - taskManager.pausePartitions(); + streamThread.state()); } else { // Close non-reassigned tasks before initializing new ones as we may have suspended active // tasks that become standbys or vice versa diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 72cff77febc2..4d6dd4df34f8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -467,21 +467,22 @@ public void setAssignmentMetadata(final Map> activeT } log.debug("Assigning metadata with: " + - "\tactiveTasks: {},\n" + - "\tstandbyTasks: {}\n" + - "The updated active task states are: \n" + + "\tpreviousAssignedActiveTasks: {},\n" + + "\tpreviousAssignedStandbyTasks: {}\n" + + "The updated task states are: \n" + "\tassignedActiveTasks {},\n" + "\tassignedStandbyTasks {},\n" + "\taddedActiveTasks {},\n" + "\taddedStandbyTasks {},\n" + "\trevokedActiveTasks {},\n" + "\trevokedStandbyTasks {}", - activeTasks, standbyTasks, assignedActiveTasks, assignedStandbyTasks, + activeTasks, standbyTasks, addedActiveTasks, addedStandbyTasks, revokedActiveTasks, revokedStandbyTasks); - this.assignedActiveTasks = activeTasks; - this.assignedStandbyTasks = standbyTasks; + + assignedActiveTasks = activeTasks; + assignedStandbyTasks = standbyTasks; } public void updateSubscriptionsFromAssignment(final List partitions) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 185fa7c3bb14..bcb6d828e418 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -173,7 +173,6 @@ public void onAssignment(final ConsumerPartitionAssignor.Assignment assignment, if (super.maybeUpdateSubscriptionVersion(usedVersion, info.commonlySupportedVersion())) { setAssignmentErrorCode(AssignorError.VERSION_PROBING.code()); - return; } final List partitions = new ArrayList<>(assignment.partitions());