Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-8980: Refactor state-store-level streams metrics #7584

Merged
merged 6 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class StreamsMetricsImpl implements StreamsMetrics {

public enum Version {
LATEST,
FROM_100_TO_24
FROM_0100_TO_24
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected mistake in naming.

}

static class ImmutableMetricValue<T> implements Gauge<T> {
Expand Down Expand Up @@ -140,14 +141,13 @@ public int hashCode() {
public static final String THREAD_LEVEL_GROUP_0100_TO_24 = GROUP_PREFIX_WO_DELIMITER + GROUP_SUFFIX;
public static final String TASK_LEVEL_GROUP = GROUP_PREFIX + "task" + GROUP_SUFFIX;
public static final String PROCESSOR_NODE_LEVEL_GROUP = GROUP_PREFIX + "processor-node" + GROUP_SUFFIX;
public static final String STATE_LEVEL_GROUP_SUFFIX = "-state" + GROUP_SUFFIX;
public static final String STATE_LEVEL_GROUP = GROUP_PREFIX + "state" + GROUP_SUFFIX;
public static final String STATE_STORE_LEVEL_GROUP = GROUP_PREFIX + "state" + GROUP_SUFFIX;
public static final String BUFFER_LEVEL_GROUP_0100_TO_24 = GROUP_PREFIX + "buffer" + GROUP_SUFFIX;
public static final String CACHE_LEVEL_GROUP = GROUP_PREFIX + "record-cache" + GROUP_SUFFIX;

public static final String TOTAL_DESCRIPTION = "The total number of ";
public static final String RATE_DESCRIPTION = "The average per-second number of ";

public static final String EXPIRED_WINDOW_RECORD_DROP = "expired-window-record-drop";
public static final String LATE_RECORD_DROP = "late-record-drop";

public StreamsMetricsImpl(final Metrics metrics, final String clientId, final String builtInMetricsVersion) {
Expand All @@ -164,7 +164,7 @@ private static Version parseBuiltInMetricsVersion(final String builtInMetricsVer
if (builtInMetricsVersion.equals(StreamsConfig.METRICS_LATEST)) {
return Version.LATEST;
} else {
return Version.FROM_100_TO_24;
return Version.FROM_0100_TO_24;
}
}

Expand Down Expand Up @@ -399,7 +399,7 @@ public Map<String, String> cacheLevelTagMap(final String threadId,
final String taskId,
final String storeName) {
final Map<String, String> tagMap = new LinkedHashMap<>();
if (version == Version.FROM_100_TO_24) {
if (version == Version.FROM_0100_TO_24) {
tagMap.put(THREAD_ID_TAG_0100_TO_24, threadId);
} else {
tagMap.put(THREAD_ID_TAG, threadId);
Expand Down Expand Up @@ -662,16 +662,24 @@ public static void addInvocationRateAndCountToSensor(final Sensor sensor,
final Map<String, String> tags,
final String operation,
final String descriptionOfRate,
final String descriptionOfInvocation) {
final String descriptionOfCount) {
addInvocationRateToSensor(sensor, group, tags, operation, descriptionOfRate);
sensor.add(
new MetricName(
operation + TOTAL_SUFFIX,
group,
descriptionOfInvocation,
descriptionOfCount,
tags
),
new CumulativeCount()
);
}

public static void addInvocationRateToSensor(final Sensor sensor,
final String group,
final Map<String, String> tags,
final String operation,
final String descriptionOfRate) {
sensor.add(
new MetricName(
operation + RATE_SUFFIX,
Expand Down Expand Up @@ -777,6 +785,21 @@ public static void maybeMeasureLatency(final Runnable actionToMeasure,
}
}

public static <T> T maybeMeasureLatency(final Supplier<T> actionToMeasure,
final Time time,
final Sensor sensor) {
if (sensor.shouldRecord()) {
final long startNs = time.nanoseconds();
try {
return actionToMeasure.get();
} finally {
sensor.record(time.nanoseconds() - startNs);
}
} else {
return actionToMeasure.get();
}
}

/**
* Deletes a sensor and its parents, if any
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.Version;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;

import java.util.Map;

Expand Down Expand Up @@ -179,12 +180,23 @@ public static Sensor droppedRecordsSensor(final String threadId,
public static Sensor droppedRecordsSensorOrSkippedRecordsSensor(final String threadId,
final String taskId,
final StreamsMetricsImpl streamsMetrics) {
if (streamsMetrics.version() == Version.FROM_100_TO_24) {
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
return ThreadMetrics.skipRecordSensor(threadId, streamsMetrics);
}
return droppedRecordsSensor(threadId, taskId, streamsMetrics);
}

public static Sensor droppedRecordsSensorOrExpiredWindowRecordDropSensor(final String threadId,
final String taskId,
final String storeType,
final String storeName,
final StreamsMetricsImpl streamsMetrics) {
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
return StateStoreMetrics.expiredWindowRecordDropSensor(threadId, taskId, storeType, storeName, streamsMetrics);
}
return droppedRecordsSensor(threadId, taskId, streamsMetrics);
}

private static Sensor invocationRateAndCountSensor(final String threadId,
final String taskId,
final String metricName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public static Sensor closeTaskSensor(final String threadId,

public static Sensor skipRecordSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
if (streamsMetrics.version() == Version.FROM_100_TO_24) {
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
return invocationRateAndCountSensor(
threadId,
SKIP_RECORD,
Expand Down Expand Up @@ -179,7 +179,7 @@ public static Sensor punctuateSensor(final String threadId,

public static Sensor commitOverTasksSensor(final String threadId,
final StreamsMetricsImpl streamsMetrics) {
if (streamsMetrics.version() == Version.FROM_100_TO_24) {
if (streamsMetrics.version() == Version.FROM_0100_TO_24) {
final Sensor commitOverTasksSensor =
streamsMetrics.threadLevelSensor(threadId, COMMIT, Sensor.RecordingLevel.DEBUG);
final Map<String, String> tagMap = streamsMetrics.taskLevelTagMap(threadId, ROLLUP_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
Expand All @@ -40,9 +41,6 @@
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;

public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBSegmentedBytesStore.class);
private final String name;
Expand Down Expand Up @@ -148,7 +146,7 @@ public void put(final Bytes key,
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
if (segment == null) {
expiredRecordSensor.record();
LOG.debug("Skipping record for expired segment.");
LOG.warn("Skipping record for expired segment.");
} else {
segment.put(key, value);
}
Expand Down Expand Up @@ -177,18 +175,12 @@ public void init(final ProcessorContext context,
final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString();

expiredRecordSensor = metrics.storeLevelSensor(
expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
threadId,
taskName,
metricScope,
name(),
EXPIRED_WINDOW_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.storeLevelTagMap(threadId, taskName, metricScope, name()),
EXPIRED_WINDOW_RECORD_DROP
metrics
);

segments.openExisting(this.context, observedStreamTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,12 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;

public class InMemorySessionStore implements SessionStore<Bytes, byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(InMemorySessionStore.class);
Expand Down Expand Up @@ -77,18 +75,12 @@ public void init(final ProcessorContext context, final StateStore root) {
final StreamsMetricsImpl metrics = ((InternalProcessorContext) context).metrics();
final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString();
expiredRecordSensor = metrics.storeLevelSensor(
expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
threadId,
taskName,
name(),
EXPIRED_WINDOW_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.storeLevelTagMap(threadId, taskName, metricScope, name()),
EXPIRED_WINDOW_RECORD_DROP
metricScope,
name,
metrics
);

if (root != null) {
Expand All @@ -106,7 +98,7 @@ public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {

if (windowEndTimestamp <= observedStreamTime - retentionPeriod) {
expiredRecordSensor.record();
LOG.debug("Skipping record for expired segment.");
LOG.warn("Skipping record for expired segment.");
} else {
if (aggregate != null) {
endTimeMap.computeIfAbsent(windowEndTimestamp, t -> new ConcurrentSkipListMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.metrics.Sensors;
import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;

import java.nio.ByteBuffer;
import java.util.Collection;
Expand All @@ -61,6 +62,7 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 1})});
private static final RecordHeaders V_2_CHANGELOG_HEADERS =
new RecordHeaders(new Header[] {new RecordHeader("v", new byte[] {(byte) 2})});
private static final String METRIC_SCOPE = "in-memory-suppression";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add this metric scope (a.k.a. store type) for the metrics tags. Now the tags would

type=stream-state-metrics,
thread-id=[threadId],
task-id=[taskId],
in-memory-suppression-state-id=[storeName]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the tags look like before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before it was:

type=stream-buffer-metrics,
client-id=[threadId]
task-id=[taskId]
buffer-id=[storeName]


private final Map<Bytes, BufferKey> index = new HashMap<>();
private final TreeMap<BufferKey, BufferValue> sortedMap = new TreeMap<>();
Expand All @@ -78,6 +80,9 @@ public final class InMemoryTimeOrderedKeyValueBuffer<K, V> implements TimeOrdere
private String changelogTopic;
private Sensor bufferSizeSensor;
private Sensor bufferCountSensor;
private StreamsMetricsImpl streamsMetrics;
private String threadId;
private String taskId;

private volatile boolean open;

Expand Down Expand Up @@ -181,10 +186,25 @@ public void setSerdesIfNull(final Serde<K> keySerde, final Serde<V> valueSerde)

@Override
public void init(final ProcessorContext context, final StateStore root) {
taskId = context.taskId().toString();
final InternalProcessorContext internalProcessorContext = (InternalProcessorContext) context;

bufferSizeSensor = Sensors.createBufferSizeSensor(this, internalProcessorContext);
bufferCountSensor = Sensors.createBufferCountSensor(this, internalProcessorContext);
streamsMetrics = internalProcessorContext.metrics();

threadId = Thread.currentThread().getName();
bufferSizeSensor = StateStoreMetrics.suppressionBufferSizeSensor(
threadId,
taskId,
METRIC_SCOPE,
storeName,
streamsMetrics
);
bufferCountSensor = StateStoreMetrics.suppressionBufferCountSensor(
threadId,
taskId,
METRIC_SCOPE,
storeName,
streamsMetrics
);
Comment on lines +194 to +207
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics for suppression buffers are fetched in this way now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, I think not relying on context but passing in parameters explicitly is fine (if that's what you mean has changed here).

Copy link
Contributor Author

@cadonna cadonna Oct 24, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted just to highlight the most important parts of the PR.

Actually, I would prefer to add the thread ID to the context and use the context for the metrics. I have already tried to do it, but I ran into a ClassCastException because we cast ProcessorContext to InternalProcessorContext (which IMO should be changed in future if possible). Using the context instead of explicit parameters makes the signature of the methods more robust. This is rather minor issue atm.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for adding the thread ID to the context


context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch);
if (loggingEnabled) {
Expand All @@ -210,6 +230,7 @@ public void close() {
memBufferSize = 0;
minTimestamp = Long.MAX_VALUE;
updateBufferMetrics();
streamsMetrics.removeAllStoreLevelSensors(threadId, taskId, storeName);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand All @@ -42,8 +43,6 @@
import java.util.Map;
import java.util.NoSuchElementException;

import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.EXPIRED_WINDOW_RECORD_DROP;
import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.addInvocationRateAndCountToSensor;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreKeyBytes;
import static org.apache.kafka.streams.state.internals.WindowKeySchema.extractStoreTimestamp;

Expand Down Expand Up @@ -93,18 +92,12 @@ public void init(final ProcessorContext context, final StateStore root) {
final StreamsMetricsImpl metrics = this.context.metrics();
final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString();
expiredRecordSensor = metrics.storeLevelSensor(
expiredRecordSensor = TaskMetrics.droppedRecordsSensorOrExpiredWindowRecordDropSensor(
threadId,
taskName,
name(),
EXPIRED_WINDOW_RECORD_DROP,
Sensor.RecordingLevel.INFO
);
addInvocationRateAndCountToSensor(
expiredRecordSensor,
"stream-" + metricScope + "-metrics",
metrics.storeLevelTagMap(threadId, taskName, metricScope, name()),
EXPIRED_WINDOW_RECORD_DROP
metricScope,
name,
metrics
);

if (root != null) {
Expand Down
Loading