-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9048 Pt1: Remove Unnecessary lookup in Fetch Building #7576
KAFKA-9048 Pt1: Remove Unnecessary lookup in Fetch Building #7576
Conversation
We already have the partition fetch data in the fetch session, so a copy is not required.
isReplicaInSync check. Reduce cost of updating map by not unnecessarily wrapping in ClientIdTopicPartition in FetcherStats
new partitions are left in next map.
We will get better performance improvements by redesigning how incremental fetch sessions work in the Replica Fetcher. Minimizing differences in the fetch session will minimize risks of the changeset.
…dstreet/kafka into K9039-p1-lucas
…dstreet/kafka into K9039-p1-lucas
At first look, this looks good. I think the truncation code where we now avoid unnecessary collection generation don't seem to be in the hot path, but are worth eliminating. We should compare benchmark results to make sure the asScala/BiConsumer changes don't de-optimize anything https://github.com/apache/kafka/pull/7576/files#diff-2d03a5d4349a23e56c09b4e773055249L157, as some ugliness is probably worth it if this operation is in the hot path. |
retest this please |
Benchmark results shows small difference from the current trunk: This branch:
This branch revert the asScala change:
Trunk:
|
@lbradstreet seems the asScala does have some regression in optimization so I've reverted it (see above benchmark results). ping @hachikuji for a final review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, just two small comments.
@@ -53,7 +53,7 @@ class ReplicaFetcherThread(name: String, | |||
failedPartitions, | |||
fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, | |||
isInterruptible = false, | |||
replicaMgr.brokerTopicStats) { | |||
if (replicaMgr == null) new BrokerTopicStats else replicaMgr.brokerTopicStats) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would ReplicaManager
be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only the case in the benchmark code: https://github.com/apache/kafka/blob/trunk/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java#L239
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can fix the benchmark? I'm not sure we should be catering the code to test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To me it is more reasonable to change the constructor actually: the brokerTopicStats
is added this week which breaks the benchmark silently, while I looked at this case I felt that theoretically we should allow the replica fetcher constructor to take either a mock replica manager or a null in order for unit tests and benchmarks that do not require the embedded module but are only testing the logic of the replica fetcher itself. I looked into how much it would take to create a mock replica manager and felt it is a bit too larger of a scope for this PR itself so I make this shortcut fix. If you feel strong about this I think we should be looking into creating a mock replica manager really, instead of letting tests that only want to test the replica fetcher module to initialize a full replica manager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should do this lbradstreet@8938a6b. I originally passed null in because I had overridden all of the methods that used it. We need to avoid mocks in anything we want to benchmark, but since it's in the setup this seems fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can factor out of ReplicaFetcherThread the critical performance sections. Obviously we don't actually support allowing ReplicaManager
to be null here.
val currentFetchState = state.value | ||
val maybeTruncationComplete = fetchOffsets.get(state.topicPartition) match { | ||
val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala | ||
.map { case(topicPartition, currentFetchState) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add space after case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
A quick history on this function:
So I think this is just a mistake that this test-only function is used in the critical path that caused inefficiency. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guozhangwang LGTM. Will leave you and @lbradstreet to figure out the null replicaMgr
check. I don't think it should be there, but we can fix it separately if you prefer.
I've merged in @lbradstreet 's commit with the mock on benchmark, and will run the unit test again before merging. |
There was a discussion about |
Fixed a minor conflict in `.gitignore` and fix compiler errors in KafkaUtilities due to `PartitionReplicaAssignment` rename to `ReplicaAssignment`. * apache-github/trunk: (34 commits) HOTFIX: Try to complete Send even if no bytes were written (apache#7622) KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets KAFKA-8972 (2.4 blocker): TaskManager state should always be updated after rebalance (apache#7620) MINOR: Fix Kafka Streams JavaDocs with regard to new StreamJoined class (apache#7627) MINOR: Fix sensor retrieval in stand0by task's constructor (apache#7632) MINOR: Replace some Java 7 style code with Java 8 style (apache#7623) KAFKA-8868: Generate SubscriptionInfo protocol message (apache#7248) MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric KAFKA-8972 (2.4 blocker): bug fix for restoring task (apache#7617) KAFKA-9093: NullPointerException in KafkaConsumer with group.instance.id (apache#7590) KAFKA-8980: Refactor state-store-level streams metrics (apache#7584) MINOR: Fix documentation for updateCurrentReassignment (apache#7611) MINOR: Preserve backwards-compatibility by renaming the AlterPartitionReassignment metric to PartitionReassignment KAFKA-8972 (2.4 blocker): clear all state for zombie task on TaskMigratedException (apache#7608) KAFKA-9077: Fix reading of metrics of Streams' SimpleBenchmark (apache#7610) KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (apache#7441) MINOR: improve logging of tasks on shutdown (apache#7597) KAFKA-9048 Pt1: Remove Unnecessary lookup in Fetch Building (apache#7576) MINOR: Fix command examples in kafka-reassign-partitions.sh docs (apache#7583) KAFKA-9102; Increase default zk session timeout and replica max lag [KIP-537] (apache#7596) ...
@ijuma my thinking was that it would give the impression that the underlying map won't change and it is safe for it to escape the thread in which it is used. In hindsight I do think I was wrong and we should make it unmodifiable even if it is to only prevent modifications outside of the PartitionStates interface. I can submit a PR for this. |
@lbradstreet Yes, indeed. Unmodifiable collections are to prevent receivers from mutating. Thread safety is not implied. That would require an immutable collection instead. |
Get rid of
partitionStates
that creates a newPartitionState
for each state since all the callers do not require it to be a Seq.Modify ReplicaFetcherThread constructor to fix the broken benchmark path.
Committer Checklist (excluded from commit message)