From 732049fc6ca1beb046deb43057be2b130736fbca Mon Sep 17 00:00:00 2001 From: Tao Jiuming <95597048+tjiuming@users.noreply.github.com> Date: Fri, 15 Apr 2022 15:00:38 +0800 Subject: [PATCH] Offloader metrics (#13833) ### Motivation Currently, there is no offload metrics for tiered storage, so it is very hard for us to debug the performance issues. For example , we can not find why offload is slow or why read offload is slow. For above reasons. we need to add some offload metrics for monitoring. ### Modifications Add metrics during offload procedure and read offload data procedure. Including offloadTime, offloadError, offloadRate, readLedgerLatency, writeStoreLatency, writeStoreError, readOffloadIndexLatency, readOffloadDataLatency, readOffloadRate, readOffloadError. --- .../mledger/LedgerOffloaderFactory.java | 8 +- .../mledger/LedgerOffloaderStats.java | 62 +++ .../mledger/LedgerOffloaderStatsDisable.java | 80 ++++ .../impl/LedgerOffloaderStatsImpl.java | 352 ++++++++++++++++++ .../apache/pulsar/broker/PulsarService.java | 14 +- .../PrometheusMetricsGenerator.java | 7 +- .../stats/LedgerOffloaderMetricsTest.java | 187 ++++++++++ .../sql/presto/PulsarConnectorCache.java | 17 +- .../sql/presto/PulsarConnectorConfig.java | 36 ++ .../FileSystemLedgerOffloaderFactory.java | 6 +- .../impl/FileStoreBackedReadHandleImpl.java | 27 +- .../FileSystemManagedLedgerOffloader.java | 54 ++- .../offload/filesystem/FileStoreTestBase.java | 6 +- .../FileSystemManagedLedgerOffloaderTest.java | 36 +- .../jcloud/JCloudLedgerOffloaderFactory.java | 6 +- .../impl/BlobStoreBackedInputStreamImpl.java | 23 ++ .../impl/BlobStoreBackedReadHandleImpl.java | 15 +- .../impl/BlobStoreBackedReadHandleImplV2.java | 12 +- .../impl/BlobStoreManagedLedgerOffloader.java | 38 +- .../BlockAwareSegmentInputStreamImpl.java | 22 +- ...reManagedLedgerOffloaderStreamingTest.java | 7 +- .../BlobStoreManagedLedgerOffloaderTest.java | 52 ++- 22 files changed, 996 insertions(+), 71 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java index c854996cd1685..bb94cee6fde49 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderFactory.java @@ -52,7 +52,8 @@ public interface LedgerOffloaderFactory { */ T create(OffloadPoliciesImpl offloadPolicies, Map userMetadata, - OrderedScheduler scheduler) + OrderedScheduler scheduler, + LedgerOffloaderStats offloaderStats) throws IOException; /** @@ -68,8 +69,9 @@ T create(OffloadPoliciesImpl offloadPolicies, default T create(OffloadPoliciesImpl offloadPolicies, Map userMetadata, SchemaStorage schemaStorage, - OrderedScheduler scheduler) + OrderedScheduler scheduler, + LedgerOffloaderStats offloaderStats) throws IOException { - return create(offloadPolicies, userMetadata, scheduler); + return create(offloadPolicies, userMetadata, scheduler, offloaderStats); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java new file mode 100644 index 0000000000000..c9330ff9e056c --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStats.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl; + + +/** + * Management Bean for a {@link LedgerOffloader}. + */ +@InterfaceAudience.LimitedPrivate +@InterfaceStability.Stable +public interface LedgerOffloaderStats extends AutoCloseable { + + void recordOffloadError(String topic); + + void recordOffloadBytes(String topic, long size); + + void recordReadLedgerLatency(String topic, long latency, TimeUnit unit); + + void recordWriteToStorageError(String topic); + + void recordReadOffloadError(String topic); + + void recordReadOffloadBytes(String topic, long size); + + void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit); + + void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit); + + void recordDeleteOffloadOps(String topic, boolean succeed); + + + static LedgerOffloaderStats create(boolean exposeManagedLedgerStats, boolean exposeTopicLevelMetrics, + ScheduledExecutorService scheduler, int interval) { + if (!exposeManagedLedgerStats) { + return LedgerOffloaderStatsDisable.INSTANCE; + } + + return LedgerOffloaderStatsImpl.getInstance(exposeTopicLevelMetrics, scheduler, interval); + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java new file mode 100644 index 0000000000000..862d466067e69 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/LedgerOffloaderStatsDisable.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; + +import java.util.concurrent.TimeUnit; + +class LedgerOffloaderStatsDisable implements LedgerOffloaderStats { + + static final LedgerOffloaderStats INSTANCE = new LedgerOffloaderStatsDisable(); + + private LedgerOffloaderStatsDisable() { + + } + + @Override + public void recordOffloadError(String topic) { + + } + + @Override + public void recordOffloadBytes(String topic, long size) { + + } + + @Override + public void recordReadLedgerLatency(String topic, long latency, TimeUnit unit) { + + } + + @Override + public void recordWriteToStorageError(String topic) { + + } + + @Override + public void recordReadOffloadError(String topic) { + + } + + @Override + public void recordReadOffloadBytes(String topic, long size) { + + } + + @Override + public void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit) { + + } + + @Override + public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit) { + + } + + @Override + public void recordDeleteOffloadOps(String topic, boolean succeed) { + + } + + @Override + public void close() throws Exception { + + } +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java new file mode 100644 index 0000000000000..1c21cd56445e2 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/LedgerOffloaderStatsImpl.java @@ -0,0 +1,352 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import com.google.common.annotations.VisibleForTesting; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Counter; +import io.prometheus.client.Gauge; +import io.prometheus.client.Summary; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Collectors; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.naming.TopicName; + +public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Runnable { + private static final String TOPIC_LABEL = "topic"; + private static final String NAMESPACE_LABEL = "namespace"; + private static final String UNKNOWN = "unknown"; + private static final String STATUS = "status"; + private static final String SUCCEED = "succeed"; + private static final String FAILED = "failed"; + + private final boolean exposeTopicLevelMetrics; + private final int interval; + + private final Counter offloadError; + private final Gauge offloadRate; + private final Counter deleteOffloadOps; + private final Summary readLedgerLatency; + private final Counter writeStorageError; + private final Counter readOffloadError; + private final Gauge readOffloadRate; + private final Summary readOffloadIndexLatency; + private final Summary readOffloadDataLatency; + + private final Map topicAccess; + private final Map topic2Namespace; + private final Map> offloadAndReadOffloadBytesMap; + + final AtomicBoolean closed = new AtomicBoolean(false); + + private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics, + ScheduledExecutorService scheduler, int interval) { + this.interval = interval; + this.exposeTopicLevelMetrics = exposeTopicLevelMetrics; + if (null != scheduler) { + scheduler.scheduleAtFixedRate(this, interval, interval, TimeUnit.SECONDS); + } + + this.topicAccess = new ConcurrentHashMap<>(); + this.topic2Namespace = new ConcurrentHashMap<>(); + this.offloadAndReadOffloadBytesMap = new ConcurrentHashMap<>(); + + String[] labels = exposeTopicLevelMetrics + ? new String[]{NAMESPACE_LABEL, TOPIC_LABEL} : new String[]{NAMESPACE_LABEL}; + + this.offloadError = Counter.build("brk_ledgeroffloader_offload_error", "-") + .labelNames(labels).create().register(); + this.offloadRate = Gauge.build("brk_ledgeroffloader_offload_rate", "-") + .labelNames(labels).create().register(); + + this.readOffloadError = Counter.build("brk_ledgeroffloader_read_offload_error", "-") + .labelNames(labels).create().register(); + this.readOffloadRate = Gauge.build("brk_ledgeroffloader_read_offload_rate", "-") + .labelNames(labels).create().register(); + this.writeStorageError = Counter.build("brk_ledgeroffloader_write_storage_error", "-") + .labelNames(labels).create().register(); + + this.readOffloadIndexLatency = Summary.build("brk_ledgeroffloader_read_offload_index_latency", "-") + .labelNames(labels).create().register(); + this.readOffloadDataLatency = Summary.build("brk_ledgeroffloader_read_offload_data_latency", "-") + .labelNames(labels).create().register(); + this.readLedgerLatency = Summary.build("brk_ledgeroffloader_read_ledger_latency", "-") + .labelNames(labels).create().register(); + + String[] deleteOpsLabels = exposeTopicLevelMetrics + ? new String[]{NAMESPACE_LABEL, TOPIC_LABEL, STATUS} : new String[]{NAMESPACE_LABEL, STATUS}; + this.deleteOffloadOps = Counter.build("brk_ledgeroffloader_delete_offload_ops", "-") + .labelNames(deleteOpsLabels).create().register(); + } + + + private static LedgerOffloaderStats instance; + public static synchronized LedgerOffloaderStats getInstance(boolean exposeTopicLevelMetrics, + ScheduledExecutorService scheduler, int interval) { + if (null == instance) { + instance = new LedgerOffloaderStatsImpl(exposeTopicLevelMetrics, scheduler, interval); + } + + return instance; + } + + @Override + public void recordOffloadError(String topic) { + String[] labelValues = this.labelValues(topic); + this.offloadError.labels(labelValues).inc(); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordOffloadBytes(String topic, long size) { + topic = StringUtils.isBlank(topic) ? UNKNOWN : topic; + Pair pair = this.offloadAndReadOffloadBytesMap + .computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder())); + pair.getLeft().add(size); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordReadLedgerLatency(String topic, long latency, TimeUnit unit) { + String[] labelValues = this.labelValues(topic); + this.readLedgerLatency.labels(labelValues).observe(unit.toMicros(latency)); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordWriteToStorageError(String topic) { + String[] labelValues = this.labelValues(topic); + this.writeStorageError.labels(labelValues).inc(); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordReadOffloadError(String topic) { + String[] labelValues = this.labelValues(topic); + this.readOffloadError.labels(labelValues).inc(); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordReadOffloadBytes(String topic, long size) { + topic = StringUtils.isBlank(topic) ? UNKNOWN : topic; + Pair pair = this.offloadAndReadOffloadBytesMap + .computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder())); + pair.getRight().add(size); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordReadOffloadIndexLatency(String topic, long latency, TimeUnit unit) { + String[] labelValues = this.labelValues(topic); + this.readOffloadIndexLatency.labels(labelValues).observe(unit.toMicros(latency)); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordReadOffloadDataLatency(String topic, long latency, TimeUnit unit) { + String[] labelValues = this.labelValues(topic); + this.readOffloadDataLatency.labels(labelValues).observe(unit.toMicros(latency)); + this.addOrUpdateTopicAccess(topic); + } + + @Override + public void recordDeleteOffloadOps(String topic, boolean succeed) { + String status = succeed ? SUCCEED : FAILED; + String[] labelValues = this.labelValues(topic, status); + this.deleteOffloadOps.labels(labelValues).inc(); + this.addOrUpdateTopicAccess(topic); + } + + private void addOrUpdateTopicAccess(String topic) { + topic = StringUtils.isBlank(topic) ? UNKNOWN : topic; + this.topicAccess.put(topic, System.currentTimeMillis()); + } + + private String[] labelValues(String topic, String status) { + if (StringUtils.isBlank(topic)) { + return exposeTopicLevelMetrics ? new String[]{UNKNOWN, UNKNOWN, status} : new String[]{UNKNOWN, status}; + } + String namespace = this.getNamespace(topic); + return this.exposeTopicLevelMetrics ? new String[]{namespace, topic, status} : new String[]{namespace, status}; + } + + private String[] labelValues(String topic) { + if (StringUtils.isBlank(topic)) { + return this.exposeTopicLevelMetrics ? new String[]{UNKNOWN, UNKNOWN} : new String[]{UNKNOWN}; + } + String namespace = this.getNamespace(topic); + return this.exposeTopicLevelMetrics ? new String[]{namespace, topic} : new String[]{namespace}; + } + + private String getNamespace(String topic) { + return this.topic2Namespace.computeIfAbsent(topic, t -> { + try { + return TopicName.get(t).getNamespace(); + } catch (IllegalArgumentException ex) { + return UNKNOWN; + } + }); + } + + private void cleanExpiredTopicMetrics() { + long now = System.currentTimeMillis(); + long timeout = TimeUnit.MINUTES.toMillis(2); + + topicAccess.entrySet().removeIf(entry -> { + String topic = entry.getKey(); + long access = entry.getValue(); + + if (now - access >= timeout) { + this.topic2Namespace.remove(topic); + this.offloadAndReadOffloadBytesMap.remove(topic); + String[] labelValues = this.labelValues(topic); + this.offloadError.remove(labelValues); + this.offloadRate.remove(labelValues); + this.readLedgerLatency.remove(labelValues); + this.writeStorageError.remove(labelValues); + this.readOffloadError.remove(labelValues); + this.readOffloadRate.remove(labelValues); + this.readOffloadIndexLatency.remove(labelValues); + this.readOffloadDataLatency.remove(labelValues); + + labelValues = this.labelValues(topic, SUCCEED); + this.deleteOffloadOps.remove(labelValues); + labelValues = this.labelValues(topic, FAILED); + this.deleteOffloadOps.remove(labelValues); + + return true; + } + return false; + }); + } + + @Override + public void run() { + this.cleanExpiredTopicMetrics(); + + this.offloadAndReadOffloadBytesMap.forEach((topic, pair) -> { + String[] labelValues = this.labelValues(topic); + + double interval = this.interval; + long offloadBytes = pair.getLeft().sumThenReset(); + long readOffloadBytes = pair.getRight().sumThenReset(); + + this.offloadRate.labels(labelValues).set(offloadBytes / interval); + this.readOffloadRate.labels(labelValues).set(readOffloadBytes / interval); + }); + } + + @Override + public void close() throws Exception { + if (instance == this && this.closed.compareAndSet(false, true)) { + CollectorRegistry.defaultRegistry.unregister(this.offloadError); + CollectorRegistry.defaultRegistry.unregister(this.offloadRate); + CollectorRegistry.defaultRegistry.unregister(this.readLedgerLatency); + CollectorRegistry.defaultRegistry.unregister(this.writeStorageError); + CollectorRegistry.defaultRegistry.unregister(this.readOffloadError); + CollectorRegistry.defaultRegistry.unregister(this.readOffloadRate); + CollectorRegistry.defaultRegistry.unregister(this.readOffloadIndexLatency); + CollectorRegistry.defaultRegistry.unregister(this.readOffloadDataLatency); + this.topic2Namespace.clear(); + this.offloadAndReadOffloadBytesMap.clear(); + } + } + + @VisibleForTesting + public long getOffloadBytes(String topic) { + if (this.exposeTopicLevelMetrics) { + Pair pair = this.offloadAndReadOffloadBytesMap.get(topic); + return pair.getLeft().sum(); + } + + String namespace = this.topic2Namespace.get(topic); + List topics = this.offloadAndReadOffloadBytesMap.keySet().stream() + .filter(topicName -> topicName.contains(namespace)).collect(Collectors.toList()); + + long totalBytes = 0; + for (String key : topics) { + totalBytes += this.offloadAndReadOffloadBytesMap.get(key).getLeft().sum(); + } + return totalBytes; + } + + @VisibleForTesting + public long getOffloadError(String topic) { + String[] labels = this.labelValues(topic); + return (long) this.offloadError.labels(labels).get(); + } + + @VisibleForTesting + public long getWriteStorageError(String topic) { + String[] labels = this.labelValues(topic); + return (long) this.writeStorageError.labels(labels).get(); + } + + @VisibleForTesting + public long getReadOffloadError(String topic) { + String[] labels = this.labelValues(topic); + return (long) this.readOffloadError.labels(labels).get(); + } + + @VisibleForTesting + public long getReadOffloadBytes(String topic) { + if (this.exposeTopicLevelMetrics) { + Pair pair = this.offloadAndReadOffloadBytesMap.get(topic); + return pair.getRight().sum(); + } + + String namespace = this.topic2Namespace.get(topic); + List topics = this.offloadAndReadOffloadBytesMap.keySet().stream() + .filter(topicName -> topicName.contains(namespace)).collect(Collectors.toList()); + + long totalBytes = 0; + for (String key : topics) { + totalBytes += this.offloadAndReadOffloadBytesMap.get(key).getRight().sum(); + } + return totalBytes; + } + + @VisibleForTesting + public Summary.Child.Value getReadLedgerLatency(String topic) { + String[] labels = this.labelValues(topic); + return this.readLedgerLatency.labels(labels).get(); + } + + @VisibleForTesting + public Summary.Child.Value getReadOffloadIndexLatency(String topic) { + String[] labels = this.labelValues(topic); + return this.readOffloadIndexLatency.labels(labels).get(); + } + + @VisibleForTesting + public Summary.Child.Value getReadOffloadDataLatency(String topic) { + String[] labels = this.labelValues(topic); + return this.readOffloadDataLatency.labels(labels).get(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 76b0a624037c3..35cd22d4bbe3f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -73,6 +73,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.offload.Offloaders; @@ -204,6 +205,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private OrderedScheduler offloaderScheduler; private OffloadersCache offloadersCache = new OffloadersCache(); private LedgerOffloader defaultOffloader; + private final LedgerOffloaderStats offloaderStats; private Map ledgerOffloaderMap = new ConcurrentHashMap<>(); private ScheduledFuture loadReportTask = null; private ScheduledFuture loadSheddingTask = null; @@ -336,6 +338,11 @@ public PulsarService(ServiceConfiguration config, new ExecutorProvider(1, "broker-client-shared-external-executor"); this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); + + int interval = config.getManagedLedgerStatsPeriodSeconds(); + boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus(); + this.offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(), + exposeTopicMetrics, this.getOffloaderScheduler(), interval); } public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException { @@ -530,6 +537,9 @@ public CompletableFuture closeAsync() { if (transactionExecutorProvider != null) { transactionExecutorProvider.shutdownNow(); } + if (this.offloaderStats != null) { + this.offloaderStats.close(); + } brokerClientSharedExternalExecutorProvider.shutdownNow(); brokerClientSharedInternalExecutorProvider.shutdownNow(); @@ -1259,6 +1269,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies checkNotNull(offloadPolicies.getOffloadersDirectory(), "Offloader driver is configured to be '%s' but no offloaders directory is configured.", offloadPolicies.getManagedLedgerOffloadDriver()); + Offloaders offloaders = offloadersCache.getOrLoadOffloaders( offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory()); @@ -1272,8 +1283,7 @@ public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha(), LedgerOffloader.METADATA_PULSAR_CLUSTER_NAME.toLowerCase(), config.getClusterName() ), - schemaStorage, - getOffloaderScheduler(offloadPolicies)); + schemaStorage, getOffloaderScheduler(offloadPolicies), this.offloaderStats); } catch (IOException ioe) { throw new PulsarServerException(ioe.getMessage(), ioe.getCause()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index e0235817f0c4a..bf2884e2b3a1a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.StatsProvider; import org.apache.pulsar.PulsarVersion; @@ -53,6 +54,7 @@ * in a text format suitable to be consumed by Prometheus. * Format specification can be found at Exposition Formats */ +@Slf4j public class PrometheusMetricsGenerator { static { @@ -199,13 +201,16 @@ private static void parseMetricsToPrometheusMetrics(Collection metrics, .write("{cluster=\"").write(cluster).write('"'); } + //to avoid quantile label duplicated + boolean appendedQuantile = false; for (Map.Entry metric : metrics1.getDimensions().entrySet()) { if (metric.getKey().isEmpty() || "cluster".equals(metric.getKey())) { continue; } stream.write(", ").write(metric.getKey()).write("=\"").write(metric.getValue()).write('"'); - if (value != null && !value.isEmpty()) { + if (value != null && !value.isEmpty() && !appendedQuantile) { stream.write(", ").write("quantile=\"").write(value).write('"'); + appendedQuantile = true; } } stream.write("} ").write(String.valueOf(entry.getValue())) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java new file mode 100644 index 0000000000000..c6c8f07b8684d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/LedgerOffloaderMetricsTest.java @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.List; +import java.util.Optional; +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.junit.Assert; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.Test; + +public class LedgerOffloaderMetricsTest extends BrokerTestBase { + + @Override + protected void setup() throws Exception { + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testTopicLevelMetrics() throws Exception { + conf.setExposeTopicLevelMetricsInPrometheus(true); + super.baseSetup(); + + String ns1 = "prop/ns-abc1"; + admin.namespaces().createNamespace(ns1); + String []topics = new String[3]; + + LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats(); + + LedgerOffloader offloader = Mockito.mock(LedgerOffloader.class); + Topic topic = Mockito.mock(PersistentTopic.class); + CompletableFuture> topicFuture = new CompletableFuture<>(); + Optional topicOptional = Optional.of(topic); + topicFuture.complete(topicOptional); + BrokerService brokerService = spy(pulsar.getBrokerService()); + doReturn(brokerService).when(pulsar).getBrokerService(); + + + for (int i = 0; i < 3; i++) { + String topicName = "persistent://prop/ns-abc1/testMetrics" + UUID.randomUUID(); + topics[i] = topicName; + admin.topics().createNonPartitionedTopic(topicName); + + doReturn(topicFuture).when(brokerService).getTopicIfExists(topicName); + Assert.assertTrue(topic instanceof PersistentTopic); + + ManagedLedger ledgerM = Mockito.mock(ManagedLedger.class); + doReturn(ledgerM).when(((PersistentTopic) topic)).getManagedLedger(); + ManagedLedgerConfig config = Mockito.mock(ManagedLedgerConfig.class); + doReturn(config).when(ledgerM).getConfig(); + doReturn(offloader).when(config).getLedgerOffloader(); + + offloaderStats.recordOffloadError(topicName); + offloaderStats.recordOffloadError(topicName); + offloaderStats.recordOffloadBytes(topicName, 100); + offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS); + offloaderStats.recordReadOffloadError(topicName); + offloaderStats.recordReadOffloadError(topicName); + offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS); + offloaderStats.recordReadOffloadBytes(topicName, 100000); + offloaderStats.recordWriteToStorageError(topicName); + offloaderStats.recordWriteToStorageError(topicName); + } + + for (String topicName : topics) { + Assert.assertEquals(offloaderStats.getOffloadError(topicName), 2); + Assert.assertEquals(offloaderStats.getOffloadBytes(topicName) , 100); + Assert.assertEquals((long) offloaderStats.getReadLedgerLatency(topicName).sum, 1); + Assert.assertEquals(offloaderStats.getReadOffloadError(topicName), 2); + Assert.assertEquals((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum ,1000); + Assert.assertEquals(offloaderStats.getReadOffloadBytes(topicName), 100000); + Assert.assertEquals(offloaderStats.getWriteStorageError(topicName), 2); + } + } + + @Test + public void testNamespaceLevelMetrics() throws Exception { + conf.setExposeTopicLevelMetricsInPrometheus(false); + super.baseSetup(); + + String ns1 = "prop/ns-abc1"; + String ns2 = "prop/ns-abc2"; + + LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) pulsar.getOffloaderStats(); + + LedgerOffloader offloader = Mockito.mock(LedgerOffloader.class); + Topic topic = Mockito.mock(PersistentTopic.class); + CompletableFuture> topicFuture = new CompletableFuture<>(); + Optional topicOptional = Optional.of(topic); + topicFuture.complete(topicOptional); + BrokerService brokerService = spy(pulsar.getBrokerService()); + doReturn(brokerService).when(pulsar).getBrokerService(); + Queue queue = new LinkedList<>(); + Map> namespace2Topics = new HashMap<>(); + for (int s = 0; s < 2; s++) { + String nameSpace = ns1; + if (s == 1) { + nameSpace = ns2; + } + namespace2Topics.put(nameSpace, new ArrayList<>()); + + admin.namespaces().createNamespace(nameSpace); + String baseTopic1 = "persistent://" + nameSpace + "/testMetrics"; + for (int i = 0; i < 6; i++) { + String topicName = baseTopic1 + UUID.randomUUID(); + List topicList = namespace2Topics.get(nameSpace); + topicList.add(topicName); + + queue.add(topicName); + admin.topics().createNonPartitionedTopic(topicName); + doReturn(topicFuture).when(brokerService).getTopicIfExists(topicName); + Assert.assertTrue(topic instanceof PersistentTopic); + + + ManagedLedger ledgerM = Mockito.mock(ManagedLedger.class); + doReturn(ledgerM).when(((PersistentTopic) topic)).getManagedLedger(); + ManagedLedgerConfig config = Mockito.mock(ManagedLedgerConfig.class); + doReturn(config).when(ledgerM).getConfig(); + doReturn(offloader).when(config).getLedgerOffloader(); + Mockito.when(ledgerM.getName()).thenAnswer((Answer) invocation -> queue.poll()); + + offloaderStats.recordOffloadError(topicName); + offloaderStats.recordOffloadBytes(topicName, 100); + offloaderStats.recordReadLedgerLatency(topicName, 1000, TimeUnit.NANOSECONDS); + offloaderStats.recordReadOffloadError(topicName); + offloaderStats.recordReadOffloadIndexLatency(topicName, 1000000L, TimeUnit.NANOSECONDS); + offloaderStats.recordReadOffloadBytes(topicName, 100000); + offloaderStats.recordWriteToStorageError(topicName); + } + } + + for (Map.Entry> entry : namespace2Topics.entrySet()) { + String namespace = entry.getKey(); + List topics = entry.getValue(); + String topicName = topics.get(0); + + Assert.assertTrue(offloaderStats.getOffloadError(topicName) >= 1); + Assert.assertTrue(offloaderStats.getOffloadBytes(topicName) >= 100); + Assert.assertTrue((long) offloaderStats.getReadLedgerLatency(topicName).sum >= 1); + Assert.assertTrue(offloaderStats.getReadOffloadError(topicName) >= 1); + Assert.assertTrue((long) offloaderStats.getReadOffloadIndexLatency(topicName).sum >= 1000); + Assert.assertTrue(offloaderStats.getReadOffloadBytes(topicName) >= 100000); + Assert.assertTrue(offloaderStats.getWriteStorageError(topicName) >= 1); + } + } + +} diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index 9a64c055d07ac..f6a4771f9e41e 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -30,6 +30,7 @@ import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; @@ -61,6 +62,7 @@ public class PulsarConnectorCache { private final StatsProvider statsProvider; private OrderedScheduler offloaderScheduler; + private final LedgerOffloaderStats offloaderStats; private OffloadersCache offloadersCache = new OffloadersCache(); private LedgerOffloader defaultOffloader; private Map offloaderMap = new ConcurrentHashMap<>(); @@ -84,6 +86,14 @@ private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws this.statsProvider.start(clientConfiguration); + this.initOffloaderScheduler(pulsarConnectorConfig.getOffloadPolices()); + + int period = pulsarConnectorConfig.getManagedLedgerStatsPeriodSeconds(); + boolean exposeTopicLevelMetrics = pulsarConnectorConfig.isExposeTopicLevelMetricsInPrometheus(); + this.offloaderStats = + LedgerOffloaderStats.create(pulsarConnectorConfig.isExposeManagedLedgerMetricsInPrometheus(), + exposeTopicLevelMetrics, offloaderScheduler, period); + this.defaultOffloader = initManagedLedgerOffloader( pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig); } @@ -142,13 +152,10 @@ public ManagedLedgerConfig getManagedLedgerConfig(NamespaceName namespaceName, O return managedLedgerConfig; } - private synchronized OrderedScheduler getOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) { - if (this.offloaderScheduler == null) { + private void initOffloaderScheduler(OffloadPoliciesImpl offloadPolicies) { this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads()) .name("pulsar-offloader").build(); - } - return this.offloaderScheduler; } private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPolicies, @@ -171,7 +178,7 @@ private LedgerOffloader initManagedLedgerOffloader(OffloadPoliciesImpl offloadPo LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() ), - getOffloaderScheduler(offloadPolicies)); + this.offloaderScheduler, this.offloaderStats); } catch (IOException ioe) { log.error("Failed to create offloader: ", ioe); throw new RuntimeException(ioe.getMessage(), ioe.getCause()); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java index abaf9a6fbceb6..0e5c2b4e95f24 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java @@ -69,6 +69,11 @@ public class PulsarConnectorConfig implements AutoCloseable { private String offloadersDirectory = "./offloaders"; private Map offloaderProperties = new HashMap<>(); + //--- Ledger metrics --- + private boolean exposeTopicLevelMetricsInPrometheus = false; + private boolean exposeManagedLedgerMetricsInPrometheus = false; + private int managedLedgerStatsPeriodSeconds = 60; + private PulsarAdmin pulsarAdmin; // --- Bookkeeper @@ -277,6 +282,37 @@ public PulsarConnectorConfig setOffloaderProperties(String offloaderProperties) return this; } + @Config("pulsar.expose-topic-level-metrics-in-prometheus") + public PulsarConnectorConfig setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) { + this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus; + return this; + } + + public boolean isExposeTopicLevelMetricsInPrometheus() { + return exposeTopicLevelMetricsInPrometheus; + } + + @Config("pulsar.expose-managed-ledger-metrics-in-prometheus") + public PulsarConnectorConfig setExposeManagedLedgerMetricsInPrometheus( + boolean exposeManagedLedgerMetricsInPrometheus) { + this.exposeManagedLedgerMetricsInPrometheus = exposeManagedLedgerMetricsInPrometheus; + return this; + } + + public boolean isExposeManagedLedgerMetricsInPrometheus() { + return exposeManagedLedgerMetricsInPrometheus; + } + + @Config("pulsar.managed-ledger-stats-period-seconds") + public PulsarConnectorConfig setManagedLedgerStatsPeriodSeconds(int managedLedgerStatsPeriodSeconds) { + this.managedLedgerStatsPeriodSeconds = managedLedgerStatsPeriodSeconds; + return this; + } + + public int getManagedLedgerStatsPeriodSeconds() { + return managedLedgerStatsPeriodSeconds; + } + // --- Authentication --- public String getAuthPlugin() { diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java index 4b32f77bc1fd8..c7876f9941a84 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/FileSystemLedgerOffloaderFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; @@ -34,7 +35,8 @@ public boolean isDriverSupported(String driverName) { @Override public FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map userMetadata, - OrderedScheduler scheduler) throws IOException { - return FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler); + OrderedScheduler scheduler, + LedgerOffloaderStats offloaderStats) throws IOException { + return FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, offloaderStats); } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java index 5221b15d5d9c6..ecc0cfc23b37c 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileStoreBackedReadHandleImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; @@ -47,18 +49,25 @@ public class FileStoreBackedReadHandleImpl implements ReadHandle { private final MapFile.Reader reader; private final long ledgerId; private final LedgerMetadata ledgerMetadata; + private final LedgerOffloaderStats offloaderStats; + private final String managedLedgerName; - private FileStoreBackedReadHandleImpl(ExecutorService executor, - MapFile.Reader reader, - long ledgerId) throws IOException { + private FileStoreBackedReadHandleImpl(ExecutorService executor, MapFile.Reader reader, long ledgerId, + LedgerOffloaderStats offloaderStats, + String managedLedgerName) throws IOException { this.ledgerId = ledgerId; this.executor = executor; this.reader = reader; + this.offloaderStats = offloaderStats; + this.managedLedgerName = managedLedgerName; LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); try { key.set(FileSystemManagedLedgerOffloader.METADATA_KEY_INDEX); + long startReadIndexTime = System.nanoTime(); reader.get(key, value); + offloaderStats.recordReadOffloadIndexLatency(managedLedgerName, + System.nanoTime() - startReadIndexTime, TimeUnit.NANOSECONDS); this.ledgerMetadata = parseLedgerMetadata(ledgerId, value.copyBytes()); } catch (IOException e) { log.error("Fail to read LedgerMetadata for ledgerId {}", @@ -113,7 +122,10 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr key.set(nextExpectedId - 1); reader.seek(key); while (entriesToRead > 0) { + long startReadTime = System.nanoTime(); reader.next(key, value); + this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName, + System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS); int length = value.getLength(); long entryId = key.get(); if (entryId == nextExpectedId) { @@ -122,6 +134,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr buf.writeBytes(value.copyBytes()); entriesToRead--; nextExpectedId++; + this.offloaderStats.recordReadOffloadBytes(managedLedgerName, length); } else if (entryId > lastEntry) { log.info("Expected to read {}, but read {}, which is greater than last entry {}", nextExpectedId, entryId, lastEntry); @@ -130,6 +143,7 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr } promise.complete(LedgerEntriesImpl.create(entries)); } catch (Throwable t) { + this.offloaderStats.recordReadOffloadError(managedLedgerName); promise.completeExceptionally(t); entries.forEach(LedgerEntry::close); } @@ -176,9 +190,8 @@ public CompletableFuture readLastAddConfirmedAndEntryAsyn return promise; } - public static ReadHandle open(ScheduledExecutorService executor, - MapFile.Reader reader, - long ledgerId) throws IOException { - return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId); + public static ReadHandle open(ScheduledExecutorService executor, MapFile.Reader reader, long ledgerId, + LedgerOffloaderStats offloaderStats, String managedLedgerName) throws IOException { + return new FileStoreBackedReadHandleImpl(executor, reader, ledgerId, offloaderStats, managedLedgerName); } } diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 02f1e62426af6..d91d84b476b39 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -29,12 +29,14 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.offload.filesystem.FileSystemLedgerOffloaderFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -62,6 +64,7 @@ public class FileSystemManagedLedgerOffloader implements LedgerOffloader { private static final long ENTRIES_PER_READ = 100; private OrderedScheduler assignmentScheduler; private OffloadPoliciesImpl offloadPolicies; + private final LedgerOffloaderStats offloaderStats; public static boolean driverSupported(String driver) { return DRIVER_NAMES.equals(driver); @@ -73,11 +76,13 @@ public String getOffloadDriverName() { } public static FileSystemManagedLedgerOffloader create(OffloadPoliciesImpl conf, - OrderedScheduler scheduler) throws IOException { - return new FileSystemManagedLedgerOffloader(conf, scheduler); + OrderedScheduler scheduler, + LedgerOffloaderStats offloaderStats) throws IOException { + return new FileSystemManagedLedgerOffloader(conf, scheduler, offloaderStats); } - private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler) throws IOException { + private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler, + LedgerOffloaderStats offloaderStats) throws IOException { this.offloadPolicies = conf; this.configuration = new Configuration(); if (conf.getFileSystemProfilePath() != null) { @@ -105,13 +110,15 @@ private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedSchedu this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder() .numThreads(conf.getManagedLedgerOffloadMaxThreads()) .name("offload-assignment").build(); + this.offloaderStats = offloaderStats; } @VisibleForTesting public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedScheduler scheduler, String testHDFSPath, - String baseDir) throws IOException { + String baseDir, + LedgerOffloaderStats offloaderStats) throws IOException { this.offloadPolicies = conf; this.configuration = new Configuration(); this.configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); @@ -125,6 +132,7 @@ public FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder() .numThreads(conf.getManagedLedgerOffloadMaxThreads()) .name("offload-assignment").build(); + this.offloaderStats = offloaderStats; } @Override @@ -142,8 +150,9 @@ public Map getOffloadDriverMetadata() { public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map extraMetadata) { CompletableFuture promise = new CompletableFuture<>(); scheduler.chooseThread(readHandle.getId()).submit( - new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, - configuration, assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds())); + new LedgerReader(readHandle, uuid, extraMetadata, promise, storageBasePath, configuration, + assignmentScheduler, offloadPolicies.getManagedLedgerOffloadPrefetchRounds(), + this.offloaderStats)); return promise; } @@ -158,6 +167,7 @@ private static class LedgerReader implements Runnable { volatile Exception fileSystemWriteException = null; private OrderedScheduler assignmentScheduler; private int managedLedgerOffloadPrefetchRounds = 1; + private final LedgerOffloaderStats offloaderStats; private LedgerReader(ReadHandle readHandle, UUID uuid, @@ -166,7 +176,8 @@ private LedgerReader(ReadHandle readHandle, String storageBasePath, Configuration configuration, OrderedScheduler assignmentScheduler, - int managedLedgerOffloadPrefetchRounds) { + int managedLedgerOffloadPrefetchRounds, + LedgerOffloaderStats offloaderStats) { this.readHandle = readHandle; this.uuid = uuid; this.extraMetadata = extraMetadata; @@ -175,6 +186,7 @@ private LedgerReader(ReadHandle readHandle, this.configuration = configuration; this.assignmentScheduler = assignmentScheduler; this.managedLedgerOffloadPrefetchRounds = managedLedgerOffloadPrefetchRounds; + this.offloaderStats = offloaderStats; } @Override @@ -185,7 +197,8 @@ public void run() { return; } long ledgerId = readHandle.getId(); - String storagePath = getStoragePath(storageBasePath, extraMetadata.get(MANAGED_LEDGER_NAME)); + final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME); + String storagePath = getStoragePath(storageBasePath, topicName); String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid); LongWritable key = new LongWritable(); BytesWritable value = new BytesWritable(); @@ -208,7 +221,10 @@ public void run() { long end = Math.min(needToOffloadFirstEntryNumber + ENTRIES_PER_READ - 1, readHandle.getLastAddConfirmed()); log.debug("read ledger entries. start: {}, end: {}", needToOffloadFirstEntryNumber, end); + long startReadTime = System.nanoTime(); LedgerEntries ledgerEntriesOnce = readHandle.readAsync(needToOffloadFirstEntryNumber, end).get(); + long cost = System.nanoTime() - startReadTime; + this.offloaderStats.recordReadLedgerLatency(topicName, cost, TimeUnit.NANOSECONDS); semaphore.acquire(); countDownLatch = new CountDownLatch(1); assignmentScheduler.chooseThread(ledgerId) @@ -225,10 +241,11 @@ public void run() { promise.complete(null); } catch (Exception e) { log.error("Exception when get CompletableFuture : ManagerLedgerName: {}, " - + "LedgerId: {}, UUID: {} ", extraMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, e); + + "LedgerId: {}, UUID: {} ", topicName, ledgerId, uuid, e); if (e instanceof InterruptedException) { Thread.currentThread().interrupt(); } + this.offloaderStats.recordOffloadError(topicName); promise.completeExceptionally(e); } } @@ -288,6 +305,7 @@ public static FileSystemWriter create(LedgerEntries ledgerEntriesOnce, @Override public void run() { + String managedLedgerName = ledgerReader.extraMetadata.get(MANAGED_LEDGER_NAME); if (ledgerReader.fileSystemWriteException == null) { Iterator iterator = ledgerEntriesOnce.iterator(); while (iterator.hasNext()) { @@ -299,9 +317,11 @@ public void run() { dataWriter.append(key, value); } catch (IOException e) { ledgerReader.fileSystemWriteException = e; + ledgerReader.offloaderStats.recordWriteToStorageError(managedLedgerName); break; } haveOffloadEntryNumber.incrementAndGet(); + ledgerReader.offloaderStats.recordOffloadBytes(managedLedgerName, entry.getLength()); } } countDownLatch.countDown(); @@ -315,19 +335,19 @@ public void run() { public CompletableFuture readOffloaded(long ledgerId, UUID uuid, Map offloadDriverMetadata) { + final String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME); CompletableFuture promise = new CompletableFuture<>(); - String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME)); + String storagePath = getStoragePath(storageBasePath, ledgerName); String dataFilePath = getDataFilePath(storagePath, ledgerId, uuid); scheduler.chooseThread(ledgerId).submit(() -> { try { MapFile.Reader reader = new MapFile.Reader(new Path(dataFilePath), configuration); - promise.complete(FileStoreBackedReadHandleImpl.open(scheduler.chooseThread(ledgerId), - reader, - ledgerId)); + promise.complete(FileStoreBackedReadHandleImpl.open( + scheduler.chooseThread(ledgerId), reader, ledgerId, this.offloaderStats, ledgerName)); } catch (Throwable t) { log.error("Failed to open FileStoreBackedReadHandleImpl: ManagerLedgerName: {}, " - + "LegerId: {}, UUID: {}", offloadDriverMetadata.get(MANAGED_LEDGER_NAME), ledgerId, uuid, t); + + "LegerId: {}, UUID: {}", ledgerName, ledgerId, uuid, t); promise.completeExceptionally(t); } }); @@ -344,7 +364,8 @@ private static String getDataFilePath(String storagePath, long ledgerId, UUID uu @Override public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, Map offloadDriverMetadata) { - String storagePath = getStoragePath(storageBasePath, offloadDriverMetadata.get(MANAGED_LEDGER_NAME)); + String ledgerName = offloadDriverMetadata.get(MANAGED_LEDGER_NAME); + String storagePath = getStoragePath(storageBasePath, ledgerName); String dataFilePath = getDataFilePath(storagePath, ledgerId, uid); CompletableFuture promise = new CompletableFuture<>(); try { @@ -354,7 +375,8 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, Map + this.offloaderStats.recordDeleteOffloadOps(ledgerName, t == null)); } @Override diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java index 342bce601e876..b3a02a4f7f618 100644 --- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java +++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/FileStoreTestBase.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.offload.filesystem; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.offload.filesystem.impl.FileSystemManagedLedgerOffloader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -30,6 +31,7 @@ import java.io.File; import java.nio.file.Files; import java.util.Properties; +import java.util.concurrent.Executors; public abstract class FileStoreTestBase { protected FileSystemManagedLedgerOffloader fileSystemManagedLedgerOffloader; @@ -37,6 +39,7 @@ public abstract class FileStoreTestBase { protected final String basePath = "pulsar"; private MiniDFSCluster hdfsCluster; private String hdfsURI; + protected LedgerOffloaderStats offloaderStats; @BeforeMethod(alwaysRun = true) public void start() throws Exception { @@ -48,9 +51,10 @@ public void start() throws Exception { hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort() + "/"; Properties properties = new Properties(); + this.offloaderStats = LedgerOffloaderStats.create(true, true, Executors.newScheduledThreadPool(1), 60); fileSystemManagedLedgerOffloader = new FileSystemManagedLedgerOffloader( OffloadPoliciesImpl.create(properties), - scheduler, hdfsURI, basePath); + scheduler, hdfsURI, basePath, offloaderStats); } @AfterMethod(alwaysRun = true) diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java index 3600e9e5c576e..3f9dbb35551ad 100644 --- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java +++ b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloaderTest.java @@ -27,12 +27,13 @@ import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl; import org.apache.bookkeeper.mledger.offload.filesystem.FileStoreTestBase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; - import java.net.URI; import java.util.HashMap; import java.util.Iterator; @@ -45,7 +46,7 @@ public class FileSystemManagedLedgerOffloaderTest extends FileStoreTestBase { private final PulsarMockBookKeeper bk; - private String topic = "public/default/persistent/testOffload"; + private String topic = "public/default/testOffload"; private String storagePath = createStoragePath(topic); private LedgerHandle lh; private ReadHandle toWrite; @@ -76,6 +77,12 @@ private ReadHandle buildReadHandle() throws Exception { .withPassword("foobar".getBytes()).withDigestType(DigestType.CRC32).execute().get(); } + @BeforeMethod(alwaysRun = true) + @Override + public void start() throws Exception { + super.start(); + } + @Test public void testOffloadAndRead() throws Exception { LedgerOffloader offloader = fileSystemManagedLedgerOffloader; @@ -111,6 +118,31 @@ public void testOffloadAndRead() throws Exception { } } + @Test + public void testOffloadAndReadMetrics() throws Exception { + LedgerOffloader offloader = fileSystemManagedLedgerOffloader; + UUID uuid = UUID.randomUUID(); + offloader.offload(toWrite, uuid, map).get(); + + LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) this.offloaderStats; + assertTrue(offloaderStats.getOffloadError(topic) == 0); + assertTrue(offloaderStats.getOffloadBytes(topic) > 0); + assertTrue(offloaderStats.getReadLedgerLatency(topic).count > 0); + assertTrue(offloaderStats.getWriteStorageError(topic) == 0); + + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get(); + LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1); + Iterator toTestIter = toTestEntries.iterator(); + while (toTestIter.hasNext()) { + LedgerEntry toTestEntry = toTestIter.next(); + } + + assertTrue(offloaderStats.getReadOffloadError(topic) == 0); + assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0); + assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0); + assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0); + } + @Test public void testDeleteOffload() throws Exception { LedgerOffloader offloader = fileSystemManagedLedgerOffloader; diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java index c74799ebac06f..c7293696c2f32 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/JCloudLedgerOffloaderFactory.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreManagedLedgerOffloader; import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider; import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration; @@ -45,10 +46,11 @@ public boolean isDriverSupported(String driverName) { @Override public BlobStoreManagedLedgerOffloader create(OffloadPoliciesImpl offloadPolicies, Map userMetadata, - OrderedScheduler scheduler) throws IOException { + OrderedScheduler scheduler, + LedgerOffloaderStats offloaderStats) throws IOException { TieredStorageConfiguration config = TieredStorageConfiguration.create(offloadPolicies.toProperties()); - return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler); + return BlobStoreManagedLedgerOffloader.create(config, userMetadata, scheduler, offloaderStats); } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java index e905884ddd054..61d8d0606f38e 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedInputStreamImpl.java @@ -21,6 +21,8 @@ import io.netty.buffer.ByteBuf; import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.impl.DataBlockUtils.VersionCheck; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -40,6 +42,8 @@ public class BlobStoreBackedInputStreamImpl extends BackedInputStream { private final ByteBuf buffer; private final long objectLen; private final int bufferSize; + private LedgerOffloaderStats offloaderStats; + private String managedLedgerName; private long cursor; private long bufferOffsetStart; @@ -59,6 +63,16 @@ public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String this.bufferOffsetStart = this.bufferOffsetEnd = -1; } + + public BlobStoreBackedInputStreamImpl(BlobStore blobStore, String bucket, String key, + VersionCheck versionCheck, + long objectLen, int bufferSize, + LedgerOffloaderStats offloaderStats, String managedLedgerName) { + this(blobStore, bucket, key, versionCheck, objectLen, bufferSize); + this.offloaderStats = offloaderStats; + this.managedLedgerName = managedLedgerName; + } + /** * Refill the buffered input if it is empty. * @return true if there are bytes to read, false otherwise @@ -73,7 +87,13 @@ private boolean refillBufferIfNeeded() throws IOException { objectLen - 1); try { + long startReadTime = System.nanoTime(); Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange)); + if (this.offloaderStats != null) { + this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName, + System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS); + this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1); + } versionCheck.check(key, blob); try (InputStream stream = blob.getPayload().openStream()) { @@ -88,6 +108,9 @@ private boolean refillBufferIfNeeded() throws IOException { cursor += buffer.readableBytes(); } } catch (Throwable e) { + if (null != this.offloaderStats) { + this.offloaderStats.recordReadOffloadError(this.managedLedgerName); + } throw new IOException("Error reading from BlobStore", e); } } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java index e129a11fe43ab..998912a30c405 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; import org.apache.bookkeeper.client.api.LedgerEntries; @@ -35,6 +36,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; @@ -62,8 +64,7 @@ enum State { private State state = null; private BlobStoreBackedReadHandleImpl(long ledgerId, OffloadIndexBlock index, - BackedInputStream inputStream, - ExecutorService executor) { + BackedInputStream inputStream, ExecutorService executor) { this.ledgerId = ledgerId; this.index = index; this.inputStream = inputStream; @@ -222,7 +223,8 @@ public CompletableFuture readLastAddConfirmedAndEntryAsyn public static ReadHandle open(ScheduledExecutorService executor, BlobStore blobStore, String bucket, String key, String indexKey, VersionCheck versionCheck, - long ledgerId, int readBufferSize) + long ledgerId, int readBufferSize, + LedgerOffloaderStats offloaderStats, String managedLedgerName) throws IOException { int retryCount = 3; OffloadIndexBlock index = null; @@ -233,7 +235,10 @@ public static ReadHandle open(ScheduledExecutorService executor, // If we use a backoff to control the retry, it will introduce a concurrent operation. // We don't want to make it complicated, because in the most of case it shouldn't in the retry loop. while (retryCount-- > 0) { + long readIndexStartTime = System.nanoTime(); Blob blob = blobStore.getBlob(bucket, indexKey); + offloaderStats.recordReadOffloadIndexLatency(managedLedgerName, + System.nanoTime() - readIndexStartTime, TimeUnit.NANOSECONDS); versionCheck.check(indexKey, blob); OffloadIndexBlockBuilder indexBuilder = OffloadIndexBlockBuilder.create(); try (InputStream payLoadStream = blob.getPayload().openStream()) { @@ -253,9 +258,7 @@ public static ReadHandle open(ScheduledExecutorService executor, } BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, - versionCheck, - index.getDataObjectLength(), - readBufferSize); + versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName); return new BlobStoreBackedReadHandleImpl(ledgerId, index, inputStream, executor); } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java index a6f66132c4af1..e1a8296a20e9c 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java @@ -29,6 +29,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import lombok.val; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LastConfirmedAndEntry; @@ -38,6 +39,7 @@ import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2; import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder; @@ -274,7 +276,8 @@ public CompletableFuture readLastAddConfirmedAndEntryAsyn public static ReadHandle open(ScheduledExecutorService executor, BlobStore blobStore, String bucket, List keys, List indexKeys, VersionCheck versionCheck, - long ledgerId, int readBufferSize) + long ledgerId, int readBufferSize, LedgerOffloaderStats offloaderStats, + String managedLedgerName) throws IOException { List inputStreams = new LinkedList<>(); List indice = new LinkedList<>(); @@ -282,7 +285,10 @@ public static ReadHandle open(ScheduledExecutorService executor, String indexKey = indexKeys.get(i); String key = keys.get(i); log.debug("open bucket: {} index key: {}", bucket, indexKey); + long startTime = System.nanoTime(); Blob blob = blobStore.getBlob(bucket, indexKey); + offloaderStats.recordReadOffloadIndexLatency(managedLedgerName, + System.nanoTime() - startTime, TimeUnit.NANOSECONDS); log.debug("indexKey blob: {} {}", indexKey, blob); versionCheck.check(indexKey, blob); OffloadIndexBlockV2Builder indexBuilder = OffloadIndexBlockV2Builder.create(); @@ -292,9 +298,7 @@ public static ReadHandle open(ScheduledExecutorService executor, } BackedInputStream inputStream = new BlobStoreBackedInputStreamImpl(blobStore, bucket, key, - versionCheck, - index.getDataObjectLength(), - readBufferSize); + versionCheck, index.getDataObjectLength(), readBufferSize, offloaderStats, managedLedgerName); inputStreams.add(inputStream); indice.add(index); } diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index 9aa7ecde66070..1fdc0983b0e89 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -45,6 +45,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle.OfferEntryResult; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata; @@ -91,6 +92,8 @@ @Slf4j public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { + private static final String MANAGED_LEDGER_NAME = "ManagedLedgerName"; + private final OrderedScheduler scheduler; private final TieredStorageConfiguration config; private final Location writeLocation; @@ -113,16 +116,18 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { private final int streamingBlockSize; private volatile ManagedLedger ml; private OffloadIndexBlockV2Builder streamingIndexBuilder; + private final LedgerOffloaderStats offloaderStats; public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config, Map userMetadata, - OrderedScheduler scheduler) throws IOException { + OrderedScheduler scheduler, + LedgerOffloaderStats offloaderStats) throws IOException { - return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata); + return new BlobStoreManagedLedgerOffloader(config, scheduler, userMetadata, offloaderStats); } BlobStoreManagedLedgerOffloader(TieredStorageConfiguration config, OrderedScheduler scheduler, - Map userMetadata) { + Map userMetadata, LedgerOffloaderStats offloaderStats) { this.scheduler = scheduler; this.userMetadata = userMetadata; @@ -149,6 +154,7 @@ public static BlobStoreManagedLedgerOffloader create(TieredStorageConfiguration config.getBucket(), config.getRegion()); blobStores.putIfAbsent(config.getBlobStoreLocation(), config.getBlobStore()); + this.offloaderStats = offloaderStats; log.info("The ledger offloader was created."); } @@ -170,6 +176,7 @@ public Map getOffloadDriverMetadata() { public CompletableFuture offload(ReadHandle readHandle, UUID uuid, Map extraMetadata) { + final String topicName = extraMetadata.get(MANAGED_LEDGER_NAME); final BlobStore writeBlobStore = blobStores.get(config.getBlobStoreLocation()); log.info("offload {} uuid {} extraMetadata {} to {} {}", readHandle.getId(), uuid, extraMetadata, config.getBlobStoreLocation(), writeBlobStore); @@ -212,13 +219,14 @@ public CompletableFuture offload(ReadHandle readHandle, try { long startEntry = 0; int partId = 1; + long start = System.nanoTime(); long entryBytesWritten = 0; while (startEntry <= readHandle.getLastAddConfirmed()) { int blockSize = BlockAwareSegmentInputStreamImpl .calculateBlockSize(config.getMaxBlockSizeInBytes(), readHandle, startEntry, entryBytesWritten); try (BlockAwareSegmentInputStream blockStream = new BlockAwareSegmentInputStreamImpl( - readHandle, startEntry, blockSize)) { + readHandle, startEntry, blockSize, this.offloaderStats, topicName)) { Payload partPayload = Payloads.newInputStreamPayload(blockStream); partPayload.getContentMetadata().setContentLength((long) blockSize); @@ -237,6 +245,7 @@ public CompletableFuture offload(ReadHandle readHandle, } entryBytesWritten += blockStream.getBlockEntryBytesCount(); partId++; + this.offloaderStats.recordOffloadBytes(topicName, blockStream.getBlockEntryBytesCount()); } dataObjectLength += blockSize; @@ -254,6 +263,8 @@ public CompletableFuture offload(ReadHandle readHandle, log.error("Failed abortMultipartUpload in bucket - {} with key - {}, uploadId - {}.", config.getBucket(), dataBlockKey, mpu.id(), throwable); } + this.offloaderStats.recordWriteToStorageError(topicName); + this.offloaderStats.recordOffloadError(topicName); promise.completeExceptionally(t); return; } @@ -277,7 +288,6 @@ public CompletableFuture offload(ReadHandle readHandle, .payload(indexPayload) .contentLength((long) indexStream.getStreamSize()) .build(); - writeBlobStore.putBlob(config.getBucket(), blob); promise.complete(null); } catch (Throwable t) { @@ -287,6 +297,9 @@ public CompletableFuture offload(ReadHandle readHandle, log.error("Failed deleteObject in bucket - {} with key - {}.", config.getBucket(), dataBlockKey, throwable); } + + this.offloaderStats.recordWriteToStorageError(topicName); + this.offloaderStats.recordOffloadError(topicName); promise.completeExceptionally(t); return; } @@ -532,7 +545,8 @@ public CompletableFuture readOffloaded(long ledgerId, UUID uid, readBlobstore, readBucket, key, indexKey, DataBlockUtils.VERSION_CHECK, - ledgerId, config.getReadBufferSizeInBytes())); + ledgerId, config.getReadBufferSizeInBytes(), + this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME))); } catch (Throwable t) { log.error("Failed readOffloaded: ", t); promise.completeExceptionally(t); @@ -565,7 +579,8 @@ public CompletableFuture readOffloaded(long ledgerId, MLDataFormats. readBlobstore, readBucket, keys, indexKeys, DataBlockUtils.VERSION_CHECK, - ledgerId, config.getReadBufferSizeInBytes())); + ledgerId, config.getReadBufferSizeInBytes(), + this.offloaderStats, offloadDriverMetadata.get(MANAGED_LEDGER_NAME))); } catch (Throwable t) { log.error("Failed readOffloaded: ", t); promise.completeExceptionally(t); @@ -594,7 +609,11 @@ public CompletableFuture deleteOffloaded(long ledgerId, UUID uid, } }); - return promise; + return promise.whenComplete((__, t) -> { + if (null != this.ml) { + this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(), t == null); + } + }); } @Override @@ -616,7 +635,8 @@ public CompletableFuture deleteOffloaded(UUID uid, Map off } }); - return promise; + return promise.whenComplete((__, t) -> + this.offloaderStats.recordDeleteOffloadOps(this.ml.getName(), t == null)); } @Override diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java index a4ffdea65098f..da1f92438f0fb 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlockAwareSegmentInputStreamImpl.java @@ -27,9 +27,11 @@ import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.offload.jcloud.BlockAwareSegmentInputStream; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.slf4j.Logger; @@ -65,6 +67,8 @@ public class BlockAwareSegmentInputStreamImpl extends BlockAwareSegmentInputStre static final int ENTRY_HEADER_SIZE = 4 /* entry size */ + 8 /* entry id */; // Keep a list of all entries ByteBuf, each ByteBuf contains 2 buf: entry header and entry content. private List entriesByteBuf = null; + private LedgerOffloaderStats offloaderStats; + private String topicName; public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize) { this.ledger = ledger; @@ -76,6 +80,13 @@ public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, in this.entriesByteBuf = Lists.newLinkedList(); } + public BlockAwareSegmentInputStreamImpl(ReadHandle ledger, long startEntryId, int blockSize, + LedgerOffloaderStats offloaderStats, String ledgerName) { + this(ledger, startEntryId, blockSize); + this.offloaderStats = offloaderStats; + this.topicName = ledgerName; + } + // read ledger entries. private int readEntries() throws IOException { checkState(bytesReadOffset >= DataBlockHeaderImpl.getDataStartOffset()); @@ -113,11 +124,18 @@ private int readEntries() throws IOException { private List readNextEntriesFromLedger(long start, long maxNumberEntries) throws IOException { long end = Math.min(start + maxNumberEntries - 1, ledger.getLastAddConfirmed()); + long startTime = System.nanoTime(); try (LedgerEntries ledgerEntriesOnce = ledger.readAsync(start, end).get()) { - log.debug("read ledger entries. start: {}, end: {}", start, end); + if (log.isDebugEnabled()) { + log.debug("read ledger entries. start: {}, end: {} cost {}", start, end, + TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startTime)); + } + if (offloaderStats != null && topicName != null) { + offloaderStats.recordReadLedgerLatency(topicName, System.nanoTime() - startTime, + TimeUnit.NANOSECONDS); + } List entries = Lists.newLinkedList(); - Iterator iterator = ledgerEntriesOnce.iterator(); while (iterator.hasNext()) { LedgerEntry entry = iterator.next(); diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java index 32ad4980f4683..952e758f0e014 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderStreamingTest.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloader.OffloadHandle; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider; @@ -52,6 +53,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderStreamingTest.class); private TieredStorageConfiguration mockedConfig; private static final Random random = new Random(); + private final LedgerOffloaderStats offloaderStats; BlobStoreManagedLedgerOffloaderStreamingTest() throws Exception { super(); @@ -60,6 +62,7 @@ public class BlobStoreManagedLedgerOffloaderStreamingTest extends BlobStoreManag assertNotNull(provider); provider.validate(config); blobStore = provider.getBlobStore(config); + this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0); } private BlobStoreManagedLedgerOffloader getOffloader(Map additionalConfig) throws IOException { @@ -76,7 +79,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, Map(), scheduler); + .create(mockedConfig, new HashMap(), scheduler, this.offloaderStats); return offloader; } @@ -85,7 +88,7 @@ private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mo mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket, additionalConfig))); Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore(); BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader - .create(mockedConfig, new HashMap(), scheduler); + .create(mockedConfig, new HashMap(), scheduler, this.offloaderStats); return offloader; } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java index c607e79ddc2f4..74d03b11dbaf0 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java @@ -23,10 +23,9 @@ import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; -import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.fail; - +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -38,12 +37,14 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - +import java.util.concurrent.Executors; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.LedgerOffloader; +import org.apache.bookkeeper.mledger.LedgerOffloaderStats; +import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl; import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata; import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider; import org.apache.bookkeeper.mledger.offload.jcloud.provider.TieredStorageConfiguration; @@ -60,6 +61,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO private static final Logger log = LoggerFactory.getLogger(BlobStoreManagedLedgerOffloaderTest.class); private TieredStorageConfiguration mockedConfig; + private final LedgerOffloaderStats offloaderStats; BlobStoreManagedLedgerOffloaderTest() throws Exception { super(); @@ -68,6 +70,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO assertNotNull(provider); provider.validate(config); blobStore = provider.getBlobStore(config); + this.offloaderStats = LedgerOffloaderStats.create(true, true, Executors.newScheduledThreadPool(1), 60); } private BlobStoreManagedLedgerOffloader getOffloader() throws IOException { @@ -81,14 +84,14 @@ private BlobStoreManagedLedgerOffloader getOffloader(BlobStore mockedBlobStore) private BlobStoreManagedLedgerOffloader getOffloader(String bucket) throws IOException { mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket))); Mockito.doReturn(blobStore).when(mockedConfig).getBlobStore(); // Use the REAL blobStore - BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler); + BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, this.offloaderStats); return offloader; } private BlobStoreManagedLedgerOffloader getOffloader(String bucket, BlobStore mockedBlobStore) throws IOException { mockedConfig = mock(TieredStorageConfiguration.class, delegatesTo(getConfiguration(bucket))); Mockito.doReturn(mockedBlobStore).when(mockedConfig).getBlobStore(); - BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler); + BlobStoreManagedLedgerOffloader offloader = BlobStoreManagedLedgerOffloader.create(mockedConfig, new HashMap(), scheduler, this.offloaderStats); return offloader; } @@ -162,6 +165,41 @@ public void testReadHandlerState() throws Exception { Assert.assertEquals(toTest.getState(), BlobStoreBackedReadHandleImpl.State.Closed); } + @Test(timeOut = 600000) // 10 minutes. + public void testOffloadAndReadMetrics() throws Exception { + ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 3); + LedgerOffloader offloader = getOffloader(); + + UUID uuid = UUID.randomUUID(); + + String topic = "test"; + Map extraMap = new HashMap<>(); + extraMap.put("ManagedLedgerName", topic); + offloader.offload(toWrite, uuid, extraMap).get(); + + LedgerOffloaderStatsImpl offloaderStats = (LedgerOffloaderStatsImpl) this.offloaderStats; + + assertEquals(offloaderStats.getOffloadError(topic), 0); + assertTrue(offloaderStats.getOffloadBytes(topic) > 0 ); + assertTrue(offloaderStats.getReadLedgerLatency(topic).count > 0); + assertEquals(offloaderStats.getWriteStorageError(topic), 0); + + Map map = new HashMap<>(); + map.putAll(offloader.getOffloadDriverMetadata()); + map.put("ManagedLedgerName", topic); + ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, map).get(); + LedgerEntries toTestEntries = toTest.read(0, toTest.getLastAddConfirmed()); + Iterator toTestIter = toTestEntries.iterator(); + while (toTestIter.hasNext()) { + LedgerEntry toTestEntry = toTestIter.next(); + } + + assertEquals(offloaderStats.getReadOffloadError(topic), 0); + assertTrue(offloaderStats.getReadOffloadBytes(topic) > 0); + assertTrue(offloaderStats.getReadOffloadDataLatency(topic).count > 0); + assertTrue(offloaderStats.getReadOffloadIndexLatency(topic).count > 0); + } + @Test public void testOffloadFailInitDataBlockUpload() throws Exception { ReadHandle readHandle = buildReadHandle(); @@ -497,7 +535,7 @@ public void testReadEOFException() throws Throwable { try { toTest.readAsync(0, 0).get(); } catch (Exception e) { - fail("Get unexpected exception when reading entries", e); + Assert.fail("Get unexpected exception when reading entries", e); } }