-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-8980: Refactor state-store-level streams metrics #7584
KAFKA-8980: Refactor state-store-level streams metrics #7584
Conversation
Call for review: @guozhangwang @vvcephei @bbejeck |
@@ -61,6 +62,7 @@ | |||
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})}); | |||
private static final RecordHeaders V_2_CHANGELOG_HEADERS = | |||
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})}); | |||
private static final String METRIC_SCOPE = "in-memory-suppression"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to add this metric scope (a.k.a. store type) for the metrics tags. Now the tags would
type=stream-state-metrics,
thread-id=[threadId],
task-id=[taskId],
in-memory-suppression-state-id=[storeName]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the tags look like before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before it was:
type=stream-buffer-metrics,
client-id=[threadId]
task-id=[taskId]
buffer-id=[storeName]
@@ -50,7 +50,7 @@ | |||
|
|||
public enum Version { | |||
LATEST, | |||
FROM_100_TO_24 | |||
FROM_0100_TO_24 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected mistake in naming.
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateToSensor; | ||
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addValueMetricToSensor; | ||
|
||
public class StateStoreMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class contains all specifications of state-store-level metrics.
bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor( | ||
threadId, | ||
taskId, | ||
METRIC_SCOPE, | ||
storeName, | ||
streamsMetrics | ||
); | ||
bufferCountSensor = StateStoreMetrics.suppressionBufferCountSensor( | ||
threadId, | ||
taskId, | ||
METRIC_SCOPE, | ||
storeName, | ||
streamsMetrics | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metrics for suppression buffers are fetched in this way now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I think not relying on context but passing in parameters explicitly is fine (if that's what you mean has changed here).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted just to highlight the most important parts of the PR.
Actually, I would prefer to add the thread ID to the context and use the context for the metrics. I have already tried to do it, but I ran into a ClassCastException
because we cast ProcessorContext
to InternalProcessorContext
(which IMO should be changed in future if possible). Using the context instead of explicit parameters makes the signature of the methods more robust. This is rather minor issue atm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for adding the thread ID to the context
streamsMetrics = (StreamsMetricsImpl) context.metrics(); | ||
|
||
putSensor = StateStoreMetrics.putSensor(threadId, taskId, metricsScope, name(), streamsMetrics); | ||
putIfAbsentSensor = StateStoreMetrics.putIfAbsentSensor(threadId, taskId, metricsScope, name(), streamsMetrics); | ||
putAllSensor = StateStoreMetrics.putAllSensor(threadId, taskId, metricsScope, name(), streamsMetrics); | ||
getSensor = StateStoreMetrics.getSensor(threadId, taskId, metricsScope, name(), streamsMetrics); | ||
allSensor = StateStoreMetrics.allSensor(threadId, taskId, metricsScope, name(), streamsMetrics); | ||
rangeSensor = StateStoreMetrics.rangeSensor(threadId, taskId, metricsScope, name(), streamsMetrics); | ||
flushSensor = StateStoreMetrics.flushSensor(threadId, taskId, metricsScope, name(), streamsMetrics); | ||
deleteSensor = StateStoreMetrics.deleteSensor(threadId, taskId, metricsScope, name(), streamsMetrics); | ||
final Sensor restoreSensor = | ||
StateStoreMetrics.restoreSensor(threadId, taskId, metricsScope, name(), streamsMetrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Metrics for state stores are fetched in this way now.
final String storeType, | ||
final String storeName, | ||
final StreamsMetricsImpl streamsMetrics) { | ||
final Sensor sensor = streamsMetrics.storeLevelSensor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in KIP-444 this metric would be replaced with the task-level dropped-records
right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad. Will fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that I need PR #7566 merged before I can fix this, because dropped-records
is a task-level sensor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR #7566 was merged. Rebased the PR and fixed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be missing something, but the name still contains expired-window
. I thought we were going with dropped-records
across all metrics. But this could be a misunderstanding on my part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I introduced method droppedRecordsSensorOrExpiredWindowRecordDropSensor()
in TaskMetrics
that is used in the code to get the correct sensor depending on the built-in metrics version. expiredWindowRecordDropSensor()
is only called in droppedRecordsSensorOrExpiredWindowRecordDropSensor()
.
@@ -61,6 +62,7 @@ | |||
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})}); | |||
private static final RecordHeaders V_2_CHANGELOG_HEADERS = | |||
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})}); | |||
private static final String METRIC_SCOPE = "in-memory-suppression"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the tags look like before?
bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor( | ||
threadId, | ||
taskId, | ||
METRIC_SCOPE, | ||
storeName, | ||
streamsMetrics | ||
); | ||
bufferCountSensor = StateStoreMetrics.suppressionBufferCountSensor( | ||
threadId, | ||
taskId, | ||
METRIC_SCOPE, | ||
storeName, | ||
streamsMetrics | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I think not relying on context but passing in parameters explicitly is fine (if that's what you mean has changed here).
); | ||
} | ||
|
||
public static Sensor getSensor(final String threadId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a meta comment: in KIP-439 we also discussed about fixing some metric names which are collapsed and messed with the functionality names, and back then the proposal is:
value-by-key
value-by-key-and-time
range-by-key
range-by-key-range
range-by-time-range
range-by-key-and-time-range
range-by-key-range-and-time-range
range-all
Besides the first two, all others are mainly used in IQ only and not in stream processor at all, which means that only the first two are commonly recorded in practice. So I'm thinking maybe we keep the compatibility for the first two while renaming others:
get (in kv-store, it means value-by-key, in window-store, it means value-by-key-time)
range-by-key
range-by-key-range
range-by-time-range
range-by-key-and-time-range
range-by-key-range-and-time-range
range-all
WDYT cc @vvcephei @mjsax who have participated in the discussion of KIP-439 as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @guozhangwang . This seems good to me. It shouldn't be too ambiguous either way, and the documentation would have to explain it, anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I am ok with this suggestions -- will keep it in mind for KIP-439.
Btw: it this a change for KIP-444 what needs to be communicated as follow up to the already accepted KIP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think KIP-444 just keep the metrics names on state stores as is -- we will do the renaming on KIP-439 and communicate that in that KIP.
IN_MEMORY_LRUCACHE_TAG_KEY, | ||
builtInMetricsVersion | ||
); | ||
checkRocksDBMetricsByTag( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding the coverage of tag names!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @cadonna I've made a pass. Overall looks good, but given the size of the PR, I'd like to make another pass soon.
cc @mjsax to take another look at the new state store metrics hierarchy as well since KIP-439 would change it too. |
- Refactors metrics according to KIP-444 - Introduces `StateStoreMetrics` as a central provider for state store metrics - Adds metric scope (a.k.a. store type) to the in-memory suppression buffer
d25c5d4
to
f54db7d
Compare
Made another pass on the rebased PR, lgtm. |
Replaced measurements of latency with a common measurement method in |
In JDK 11/Scala 2.12, the following tests failed:
|
Retest this, please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM just one minor clarification question.
bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor( | ||
threadId, | ||
taskId, | ||
METRIC_SCOPE, | ||
storeName, | ||
streamsMetrics | ||
); | ||
bufferCountSensor = StateStoreMetrics.suppressionBufferCountSensor( | ||
threadId, | ||
taskId, | ||
METRIC_SCOPE, | ||
storeName, | ||
streamsMetrics | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 for adding the thread ID to the context
final String storeType, | ||
final String storeName, | ||
final StreamsMetricsImpl streamsMetrics) { | ||
final Sensor sensor = streamsMetrics.storeLevelSensor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could be missing something, but the name still contains expired-window
. I thought we were going with dropped-records
across all metrics. But this could be a misunderstanding on my part.
Merged #7584 into trunk |
Fixed a minor conflict in `.gitignore` and fix compiler errors in KafkaUtilities due to `PartitionReplicaAssignment` rename to `ReplicaAssignment`. * apache-github/trunk: (34 commits) HOTFIX: Try to complete Send even if no bytes were written (apache#7622) KAFKA-9080: Revert the check added to validate non-compressed record batch does have continuous incremental offsets KAFKA-8972 (2.4 blocker): TaskManager state should always be updated after rebalance (apache#7620) MINOR: Fix Kafka Streams JavaDocs with regard to new StreamJoined class (apache#7627) MINOR: Fix sensor retrieval in stand0by task's constructor (apache#7632) MINOR: Replace some Java 7 style code with Java 8 style (apache#7623) KAFKA-8868: Generate SubscriptionInfo protocol message (apache#7248) MINOR: Correctly mark offset expiry in GroupMetadataManager's OffsetExpired metric KAFKA-8972 (2.4 blocker): bug fix for restoring task (apache#7617) KAFKA-9093: NullPointerException in KafkaConsumer with group.instance.id (apache#7590) KAFKA-8980: Refactor state-store-level streams metrics (apache#7584) MINOR: Fix documentation for updateCurrentReassignment (apache#7611) MINOR: Preserve backwards-compatibility by renaming the AlterPartitionReassignment metric to PartitionReassignment KAFKA-8972 (2.4 blocker): clear all state for zombie task on TaskMigratedException (apache#7608) KAFKA-9077: Fix reading of metrics of Streams' SimpleBenchmark (apache#7610) KAFKA-8972 (2.4 blocker): correctly release lost partitions during consumer.unsubscribe() (apache#7441) MINOR: improve logging of tasks on shutdown (apache#7597) KAFKA-9048 Pt1: Remove Unnecessary lookup in Fetch Building (apache#7576) MINOR: Fix command examples in kafka-reassign-partitions.sh docs (apache#7583) KAFKA-9102; Increase default zk session timeout and replica max lag [KIP-537] (apache#7596) ...
StateStoreMetrics
as a central provider for state store metricsCommitter Checklist (excluded from commit message)