Skip to content

Commit

Permalink
[monitor][broker] add metrics for BatchMetadataStore (#17072)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjiuming authored Nov 1, 2022
1 parent fe19639 commit adae4ae
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,86 @@ public void testMetadataStoreStats() throws Exception {
}
}

@Test
public void testBatchMetadataStoreMetrics() throws Exception {
String ns = "prop/ns-abc1";
admin.namespaces().createNamespace(ns);

String topic = "persistent://prop/ns-abc1/metadata-store-" + UUID.randomUUID();
String subName = "my-sub1";

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic).subscriptionName(subName).subscribe();

for (int i = 0; i < 100; i++) {
producer.newMessage().value(UUID.randomUUID().toString()).send();
}

for (;;) {
Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
if (message == null) {
break;
}
consumer.acknowledge(message);
}

ByteArrayOutputStream output = new ByteArrayOutputStream();
PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);
String metricsStr = output.toString();
Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);

Collection<PrometheusMetricsTest.Metric> executorQueueSize = metricsMap.get("pulsar_batch_metadata_store_executor_queue_size");
Collection<PrometheusMetricsTest.Metric> opsWaiting = metricsMap.get("pulsar_batch_metadata_store_queue_wait_time_ms" + "_sum");
Collection<PrometheusMetricsTest.Metric> batchExecuteTime = metricsMap.get("pulsar_batch_metadata_store_batch_execute_time_ms" + "_sum");
Collection<PrometheusMetricsTest.Metric> opsPerBatch = metricsMap.get("pulsar_batch_metadata_store_batch_size" + "_sum");

Assert.assertTrue(executorQueueSize.size() > 1);
Assert.assertTrue(opsWaiting.size() > 1);
Assert.assertTrue(batchExecuteTime.size() > 0);
Assert.assertTrue(opsPerBatch.size() > 0);

for (PrometheusMetricsTest.Metric m : executorQueueSize) {
Assert.assertEquals(m.tags.get("cluster"), "test");
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value >= 0);
}
for (PrometheusMetricsTest.Metric m : opsWaiting) {
Assert.assertEquals(m.tags.get("cluster"), "test");
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value >= 0);
}

for (PrometheusMetricsTest.Metric m : batchExecuteTime) {
Assert.assertEquals(m.tags.get("cluster"), "test");
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value > 0);
}

for (PrometheusMetricsTest.Metric m : opsPerBatch) {
Assert.assertEquals(m.tags.get("cluster"), "test");
String metadataStoreName = m.tags.get("name");
Assert.assertNotNull(metadataStoreName);
Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
|| metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
Assert.assertTrue(m.value > 0);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;

Expand All @@ -50,7 +51,8 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore
private final int maxDelayMillis;
private final int maxOperations;
private final int maxSize;
private MetadataEventSynchronizer synchronizer;
private final MetadataEventSynchronizer synchronizer;
private final BatchMetadataStoreStats batchMetadataStoreStats;

protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
super(conf.getMetadataStoreName());
Expand All @@ -74,6 +76,8 @@ protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
// update synchronizer and register sync listener
synchronizer = conf.getSynchronizer();
registerSyncLister(Optional.ofNullable(synchronizer));
this.batchMetadataStoreStats =
new BatchMetadataStoreStats(metadataStoreName, executor);
}

@Override
Expand All @@ -87,13 +91,14 @@ public void close() throws Exception {
scheduledTask.cancel(true);
}
super.close();
this.batchMetadataStoreStats.close();
}

private void flush() {
while (!readOps.isEmpty()) {
List<MetadataOp> ops = new ArrayList<>();
readOps.drain(ops::add, maxOperations);
batchOperation(ops);
internalBatchOperation(ops);
}

while (!writeOps.isEmpty()) {
Expand All @@ -114,7 +119,7 @@ private void flush() {
batchSize += op.size();
ops.add(writeOps.poll());
}
batchOperation(ops);
internalBatchOperation(ops);
}

flushInProgress.set(false);
Expand Down Expand Up @@ -158,16 +163,26 @@ private void enqueue(MessagePassingQueue<MetadataOp> queue, MetadataOp op) {
if (enabled) {
if (!queue.offer(op)) {
// Execute individually if we're failing to enqueue
batchOperation(Collections.singletonList(op));
internalBatchOperation(Collections.singletonList(op));
return;
}
if (queue.size() > maxOperations && flushInProgress.compareAndSet(false, true)) {
executor.execute(this::flush);
}
} else {
batchOperation(Collections.singletonList(op));
internalBatchOperation(Collections.singletonList(op));
}
}

private void internalBatchOperation(List<MetadataOp> ops) {
long now = System.currentTimeMillis();
for (MetadataOp op : ops) {
this.batchMetadataStoreStats.recordOpWaiting(now - op.created());
}
this.batchOperation(ops);
this.batchMetadataStoreStats.recordOpsInBatch(ops.size());
this.batchMetadataStoreStats.recordBatchExecuteTime(System.currentTimeMillis() - now);
}

protected abstract void batchOperation(List<MetadataOp> ops);
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ enum Type {

int size();

long created();

default OpGet asGet() {
return (OpGet) this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public class OpDelete implements MetadataOp {
private final String path;
private final Optional<Long> optExpectedVersion;
public final long created = System.currentTimeMillis();

private final CompletableFuture<Void> future = new CompletableFuture<>();

Expand All @@ -40,4 +41,9 @@ public Type getType() {
public int size() {
return path.length();
}

@Override
public long created() {
return this.created;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
public class OpGet implements MetadataOp {

private final String path;
public final long created = System.currentTimeMillis();
private final CompletableFuture<Optional<GetResult>> future = new CompletableFuture<>();

@Override
Expand All @@ -40,4 +41,9 @@ public Type getType() {
public int size() {
return path.length();
}

@Override
public long created() {
return this.created;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
public class OpGetChildren implements MetadataOp {

private final String path;
public final long created = System.currentTimeMillis();
private final CompletableFuture<List<String>> future = new CompletableFuture<>();

@Override
Expand All @@ -39,4 +40,9 @@ public Type getType() {
public int size() {
return path.length();
}

@Override
public long created() {
return this.created;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class OpPut implements MetadataOp {
private final Optional<Long> optExpectedVersion;
private final EnumSet<CreateOption> options;

public final long created = System.currentTimeMillis();
private final CompletableFuture<Stat> future = new CompletableFuture<>();

public boolean isEphemeral() {
Expand All @@ -49,4 +50,9 @@ public Type getType() {
public int size() {
return path.length() + (data != null ? data.length : 0);
}

@Override
public long created() {
return this.created;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.impl.stats;

import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;

public final class BatchMetadataStoreStats implements AutoCloseable {
private static final double[] BUCKETS = new double[]{1, 5, 10, 20, 50, 100, 200, 500, 1000};
private static final String NAME = "name";

private static final Gauge EXECUTOR_QUEUE_SIZE = Gauge
.build("pulsar_batch_metadata_store_executor_queue_size", "-")
.labelNames(NAME)
.register();
private static final Histogram OPS_WAITING = Histogram
.build("pulsar_batch_metadata_store_queue_wait_time", "-")
.unit("ms")
.labelNames(NAME)
.buckets(BUCKETS)
.register();
private static final Histogram BATCH_EXECUTE_TIME = Histogram
.build("pulsar_batch_metadata_store_batch_execute_time", "-")
.unit("ms")
.labelNames(NAME)
.buckets(BUCKETS)
.register();
private static final Histogram OPS_PER_BATCH = Histogram
.build("pulsar_batch_metadata_store_batch_size", "-")
.labelNames(NAME)
.buckets(BUCKETS)
.register();

private final AtomicBoolean closed = new AtomicBoolean(false);
private final ThreadPoolExecutor executor;
private final String metadataStoreName;

private final Histogram.Child batchOpsWaitingChild;
private final Histogram.Child batchExecuteTimeChild;
private final Histogram.Child opsPerBatchChild;

public BatchMetadataStoreStats(String metadataStoreName, ExecutorService executor) {
if (executor instanceof ThreadPoolExecutor tx) {
this.executor = tx;
} else {
this.executor = null;
}
this.metadataStoreName = metadataStoreName;

EXECUTOR_QUEUE_SIZE.setChild(new Gauge.Child() {
@Override
public double get() {
return BatchMetadataStoreStats.this.executor == null ? 0 :
BatchMetadataStoreStats.this.executor.getQueue().size();
}
}, metadataStoreName);

this.batchOpsWaitingChild = OPS_WAITING.labels(metadataStoreName);
this.batchExecuteTimeChild = BATCH_EXECUTE_TIME.labels(metadataStoreName);
this.opsPerBatchChild = OPS_PER_BATCH.labels(metadataStoreName);

}

public void recordOpWaiting(long millis) {
this.batchOpsWaitingChild.observe(millis);
}

public void recordBatchExecuteTime(long millis) {
this.batchExecuteTimeChild.observe(millis);
}

public void recordOpsInBatch(int ops) {
this.opsPerBatchChild.observe(ops);
}

@Override
public void close() throws Exception {
if (closed.compareAndSet(false, true)) {
EXECUTOR_QUEUE_SIZE.remove(this.metadataStoreName);
OPS_WAITING.remove(this.metadataStoreName);
BATCH_EXECUTE_TIME.remove(this.metadataStoreName);
OPS_PER_BATCH.remove(metadataStoreName);
}
}
}

0 comments on commit adae4ae

Please sign in to comment.