From aff71576bdc32e107b85717e72c90381b8fb7f0d Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Wed, 29 Mar 2017 14:46:08 -0700 Subject: [PATCH] fix: added read-latency histogram, reduce histogram-highestTrackableTime, measuring time using nano-sec, 3 write-count for create-ledger --- .../mledger/impl/ManagedCursorImpl.java | 7 +- .../mledger/impl/ManagedLedgerImpl.java | 29 ++++---- .../bookkeeper/mledger/impl/MetaStore.java | 12 ++-- .../mledger/impl/MetaStoreImplZookeeper.java | 69 ++++++++++--------- .../mledger/util/DimensionStats.java | 4 +- .../broker/namespace/OwnershipCache.java | 8 +-- .../stats/BrokerOperabilityMetrics.java | 31 ++++++--- .../broker/namespace/OwnershipCacheTest.java | 9 ++- .../broker/service/BrokerServiceTest.java | 2 +- .../stats/ManagedLedgerMetricsTest.java | 3 +- 10 files changed, 100 insertions(+), 74 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 6c17cf283af96..e3ad5073da7a1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -222,9 +222,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac // a new ledger and write the position into it ledger.mbean.startCursorLedgerOpenOp(); long ledgerId = info.getCursorsLedgerId(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookkeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - ledger.getStore().recordRead(System.currentTimeMillis() - now); + ledger.getStore().recordReadLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); } @@ -1825,10 +1825,9 @@ void internalFlushPendingMarkDeletes() { void createNewMetadataLedger(final VoidCallback callback) { ledger.mbean.startCursorLedgerCreateOp(); - final long now = System.currentTimeMillis(); bookkeeper.asyncCreateLedger(config.getMetadataEnsemblesize(), config.getMetadataWriteQuorumSize(), config.getMetadataAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - ledger.getStore().recordWrite(System.currentTimeMillis() - now); + ledger.getStore().recordWriteCount(3L); // create-ledger-performs-3-write ledger.getExecutor().submit(safeRun(() -> { ledger.mbean.endCursorLedgerCreateOp(); if (rc != BKException.Code.OK) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 752d09004d3ee..35fe2a649a172 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -248,7 +248,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { if (log.isDebugEnabled()) { log.debug("[{}] Opening legder {}", name, id); } - store.recordRead(); + store.recordReadCount(1L); mbean.startDataLedgerOpenOp(); bookKeeper.asyncOpenLedger(id, config.getDigestType(), config.getPassword(), opencb, null); } else { @@ -277,9 +277,9 @@ private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedg TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize()); } else { iterator.remove(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncDeleteLedger(li.getLedgerId(), (rc, ctx) -> { - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] Deleted empty ledger ledgerId={} rc={}", name, li.getLedgerId(), rc); } @@ -303,10 +303,9 @@ public void operationFailed(MetaStoreException e) { // Create a new ledger to start writing this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - final long now = System.currentTimeMillis(); bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), (rc, lh, ctx) -> { - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteCount(3L); executor.submitOrdered(name, safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { @@ -483,7 +482,7 @@ public synchronized void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) { this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - store.recordWrite(); + store.recordWriteCount(3); // create-ledger performs 3 writes on zk bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, ctx); } @@ -1082,10 +1081,10 @@ synchronized void ledgerClosed(final LedgerHandle lh) { // The last ledger was empty, so we can discard it ledgers.remove(lh.getId()); mbean.startDataLedgerDeleteOp(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> { mbean.endDataLedgerDeleteOp(); - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteLatency(System.nanoTime() - now, 1L); log.info("[{}] Delete complete for empty ledger {}. rc={}", name, lh.getId(), rc); }, null); } @@ -1100,7 +1099,7 @@ synchronized void ledgerClosed(final LedgerHandle lh) { STATE_UPDATER.set(this, State.CreatingLedger); this.lastLedgerCreationInitiationTimestamp = System.nanoTime(); mbean.startDataLedgerCreateOp(); - store.recordWrite(); + store.recordWriteCount(3); // create-ledger performs 3 writes on zk bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), config.getAckQuorumSize(), config.getDigestType(), config.getPassword(), this, null); } @@ -1167,10 +1166,10 @@ CompletableFuture getLedgerHandle(long ledgerId) { log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId); } mbean.startDataLedgerOpenOp(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncOpenLedger(ledgerId, config.getDigestType(), config.getPassword(), (int rc, LedgerHandle lh, Object ctx) -> { - store.recordRead(System.currentTimeMillis() - now); + store.recordReadLatency(System.nanoTime() - now, 1L); executor.submit(safeRun(() -> { mbean.endDataLedgerOpenOp(); if (rc != BKException.Code.OK) { @@ -1458,9 +1457,9 @@ public void operationComplete(Void result, Stat stat) { for (LedgerInfo ls : ledgersToDelete) { log.info("[{}] Removing ledger {} - size: {}", name, ls.getLedgerId(), ls.getSize()); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx) -> { - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteLatency(System.nanoTime() - now, 1L); if (rc == BKException.Code.NoSuchLedgerExistsException) { log.warn("[{}] Ledger was already deleted {}", name, ls.getLedgerId()); } else if (rc != BKException.Code.OK) { @@ -1576,9 +1575,9 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] Deleting ledger {}", name, ls); } - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); bookKeeper.asyncDeleteLedger(ls.getLedgerId(), (rc, ctx1) -> { - store.recordWrite(System.currentTimeMillis() - now); + store.recordWriteLatency(System.nanoTime() - now, 1L); switch (rc) { case BKException.Code.NoSuchLedgerExistsException: log.warn("[{}] Ledger {} not found when deleting it", name, ls.getLedgerId()); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index e03e4171bae83..511ad13230c51 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -135,28 +135,28 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn Iterable getManagedLedgers() throws MetaStoreException; /** - * Record write zk operation with latency for zk-op stats + * Record write zk write with latency (in nano-seconds) for zk-op stats * * @param latency */ - void recordWrite(long latency); + void recordWriteLatency(long latencyInNs, long count); /** * Record write zk operation for zk-op stats * */ - void recordWrite(); + void recordWriteCount(long count); /** - * Record read zk operation with latency for zk-op stats + * Record read zk read with latency (in nano-seconds) for zk-op stats * * @param latency */ - void recordRead(long latency); + void recordReadLatency(long latency, long count); /** * Record read zk operation for zk-op stats * */ - void recordRead(); + void recordReadCount(long count); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index bb63fa9d8560b..000160528d0c6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -20,6 +20,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; @@ -61,7 +62,8 @@ public static enum ZNodeProtobufFormat { private final OrderedSafeExecutor executor; private final LongAdder numWrite; private final LongAdder numRead; - private final DimensionStats zkOpLatencyStats; + private final DimensionStats zkWriteLatencyStats; + private final DimensionStats zkReadLatencyStats; private static class ZKStat implements Stat { private final int version; @@ -107,7 +109,8 @@ public MetaStoreImplZookeeper(ZooKeeper zk, ZNodeProtobufFormat protobufFormat, this.executor = executor; this.numWrite = new LongAdder(); this.numRead = new LongAdder(); - this.zkOpLatencyStats = new DimensionStats(); + this.zkWriteLatencyStats = new DimensionStats(); + this.zkReadLatencyStats = new DimensionStats(); if (zk.exists(prefixName, false) == null) { zk.create(prefixName, new byte[0], Acl, CreateMode.PERSISTENT); @@ -141,9 +144,9 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) { @Override public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback callback) { // Try to get the content or create an empty node - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.submit(safeRun(() -> { - recordRead(System.currentTimeMillis() - now); + recordReadLatency(System.nanoTime() - now, 1L); if (rc == Code.OK.intValue()) { try { ManagedLedgerInfo info = parseManagedLedgerInfo(readData); @@ -185,10 +188,10 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St mlInfo.toString().getBytes(Encoding) : // Text format mlInfo.toByteArray(); // Binary format - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.setData(prefix + ledgerName, serializedMlInfo, zkStat.getVersion(), (rc, path, zkCtx, stat1) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] UpdateLedgersIdsCallback.processResult rc={} newVersion={}", ledgerName, Code.get(rc), stat != null ? stat.getVersion() : "null"); @@ -212,9 +215,9 @@ public void getCursors(final String ledgerName, final MetaStoreCallback executor.submit(safeRun(() -> { - recordRead(System.currentTimeMillis() - now); + recordReadLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] getConsumers complete rc={} children={}", ledgerName, Code.get(rc), children); } @@ -237,9 +240,9 @@ public void asyncGetCursorInfo(String ledgerName, String consumerName, if (log.isDebugEnabled()) { log.debug("Reading from {}", path); } - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.getData(path, false, (rc, path1, ctx, data, stat) -> executor.submit(safeRun(() -> { - recordRead(System.currentTimeMillis() - now); + recordReadLatency(System.nanoTime() - now, 1L); if (rc != Code.OK.intValue()) { callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } else { @@ -272,10 +275,10 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa if (log.isDebugEnabled()) { log.debug("[{}] Creating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.create(path, content, Acl, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (rc != Code.OK.intValue()) { log.warn("[{}] Error creating cosumer {} node on meta-data store with {}: ", ledgerName, cursorName, info, Code.get(rc)); @@ -293,9 +296,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa if (log.isDebugEnabled()) { log.debug("[{}] Updating consumer {} on meta-data store with {}", ledgerName, cursorName, info); } - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.setData(path, content, zkStat.getVersion(), (rc, path1, ctx, stat1) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (rc == Code.BADVERSION.intValue()) { callback.operationFailed(new BadVersionException(KeeperException.create(Code.get(rc)))); } else if (rc != Code.OK.intValue()) { @@ -311,9 +314,9 @@ public void asyncUpdateCursorInfo(final String ledgerName, final String cursorNa public void asyncRemoveCursor(final String ledgerName, final String consumerName, final MetaStoreCallback callback) { log.info("[{}] Remove consumer={}", ledgerName, consumerName); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.delete(prefix + ledgerName + "/" + consumerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] [{}] zk delete done. rc={}", ledgerName, consumerName, Code.get(rc)); } @@ -328,9 +331,9 @@ public void asyncRemoveCursor(final String ledgerName, final String consumerName @Override public void removeManagedLedger(String ledgerName, MetaStoreCallback callback) { log.info("[{}] Remove ManagedLedger", ledgerName); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); zk.delete(prefix + ledgerName, -1, (rc, path, ctx) -> executor.submit(safeRun(() -> { - recordWrite(System.currentTimeMillis() - now); + recordWriteLatency(System.nanoTime() - now, 1L); if (log.isDebugEnabled()) { log.debug("[{}] zk delete done. rc={}", ledgerName, Code.get(rc)); } @@ -410,25 +413,25 @@ private ManagedCursorInfo parseManagedCursorInfoFromBinary(byte[] data) throws I } @Override - public void recordWrite(long latency) { - zkOpLatencyStats.recordValue(latency); - numWrite.increment(); + public void recordWriteLatency(long latencyInNs, long count) { + zkWriteLatencyStats.recordValue(TimeUnit.NANOSECONDS.toMillis(latencyInNs) / count); + numWrite.add(count); } - + @Override - public void recordWrite() { - numWrite.increment(); + public void recordWriteCount(long count) { + numWrite.add(count); } @Override - public void recordRead(long latency) { - zkOpLatencyStats.recordValue(latency); - numRead.increment(); + public void recordReadLatency(long latencyInNs, long count) { + zkReadLatencyStats.recordValue(TimeUnit.NANOSECONDS.toMillis(latencyInNs) / count); + numRead.add(count); } @Override - public void recordRead() { - numRead.increment(); + public void recordReadCount(long count) { + numRead.add(count); } public long getAndResetNumOfWrite() { @@ -439,8 +442,12 @@ public long getAndResetNumOfRead() { return numRead.sumThenReset(); } - public DimensionStats getZkOpLatencyStats() { - return this.zkOpLatencyStats; + public DimensionStats getZkWriteLatencyStats() { + return this.zkWriteLatencyStats; + } + + public DimensionStats getZkReadLatencyStats() { + return this.zkReadLatencyStats; } private static final Logger log = LoggerFactory.getLogger(MetaStoreImplZookeeper.class); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java index 99966b0bfc946..27508d30c8ddd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/DimensionStats.java @@ -41,7 +41,8 @@ public class DimensionStats { public double elapsedIntervalMs; - private Recorder dimensionTimeRecorder = new Recorder(TimeUnit.MINUTES.toMillis(10), 2); + private final long maxTrackableSeconds = 120; + private Recorder dimensionTimeRecorder = new Recorder(TimeUnit.SECONDS.toMillis(maxTrackableSeconds), 2); private Histogram dimensionHistogram = null; public void updateStats() { @@ -58,6 +59,7 @@ public void updateStats() { } public void recordValue(long dimensionLatencyMs) { + dimensionLatencyMs = dimensionLatencyMs > maxTrackableSeconds ? maxTrackableSeconds : dimensionLatencyMs; dimensionTimeRecorder.recordValue(dimensionLatencyMs); } } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java index bc3eb8b4656df..4636da13c9596 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java @@ -127,10 +127,10 @@ public CompletableFuture asyncLoad(String namespaceBundleZNode, Exe } CompletableFuture future = new CompletableFuture<>(); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); ZkUtils.asyncCreateFullPathOptimistic(localZkCache.getZooKeeper(), namespaceBundleZNode, znodeContent, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, (rc, path, ctx, name) -> { - metaStore.recordWrite(System.currentTimeMillis() - now); + metaStore.recordWriteLatency(System.nanoTime() - now, 1L); if (rc == KeeperException.Code.OK.intValue()) { if (LOG.isDebugEnabled()) { LOG.debug("Successfully acquired zk lock on {}", namespaceBundleZNode); @@ -258,9 +258,9 @@ public CompletableFuture tryAcquiringOwnership(Namespace public CompletableFuture removeOwnership(NamespaceBundle bundle) { CompletableFuture result = new CompletableFuture<>(); String key = ServiceUnitZkUtils.path(bundle); - final long now = System.currentTimeMillis(); + final long now = System.nanoTime(); localZkCache.getZooKeeper().delete(key, -1, (rc, path, ctx) -> { - metaStore.recordWrite(System.currentTimeMillis() - now); + metaStore.recordWriteLatency(System.nanoTime() - now, 1L); if (rc == KeeperException.Code.OK.intValue() || rc == KeeperException.Code.NONODE.intValue()) { LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc)); ownedBundlesCache.synchronous().invalidate(key); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java index 38e7a88a0aa45..3244d8bd747aa 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -76,16 +76,27 @@ public Metrics getZkLatencyMetrics(MetaStoreImplZookeeper metaStore) { dimensionMap.put("metric", "zk_op_stats"); Metrics dMetrics = Metrics.create(dimensionMap); - DimensionStats zkOpLatencyStats = metaStore.getZkOpLatencyStats(); - zkOpLatencyStats.updateStats(); - - dMetrics.put("zk_latency_mean_ms", zkOpLatencyStats.meanDimensionMs); - dMetrics.put("zk_latency_time_median_ms", zkOpLatencyStats.medianDimensionMs); - dMetrics.put("zk_latency_95percentile_ms", zkOpLatencyStats.dimension95Ms); - dMetrics.put("zk_latency_99_percentile_ms", zkOpLatencyStats.dimension99Ms); - dMetrics.put("zk_latency_99_9_percentile_ms", zkOpLatencyStats.dimension999Ms); - dMetrics.put("zk_latency_99_99_percentile_ms", zkOpLatencyStats.dimension999Ms); - dMetrics.put("zk_op_count", zkOpLatencyStats.dimensionCounts); + DimensionStats zkWriteLatencyStats = metaStore.getZkWriteLatencyStats(); + DimensionStats zkReadLatencyStats = metaStore.getZkReadLatencyStats(); + zkWriteLatencyStats.updateStats(); + zkReadLatencyStats.updateStats(); + + dMetrics.put("zk_latency_write_mean_ms", zkWriteLatencyStats.meanDimensionMs); + dMetrics.put("zk_latency_write_time_median_ms", zkWriteLatencyStats.medianDimensionMs); + dMetrics.put("zk_latency_write_95percentile_ms", zkWriteLatencyStats.dimension95Ms); + dMetrics.put("zk_latency_write_99_percentile_ms", zkWriteLatencyStats.dimension99Ms); + dMetrics.put("zk_latency_write_99_9_percentile_ms", zkWriteLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_write_99_99_percentile_ms", zkWriteLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_write_count", zkWriteLatencyStats.dimensionCounts); + + dMetrics.put("zk_latency_read_mean_ms", zkReadLatencyStats.meanDimensionMs); + dMetrics.put("zk_latency_read_time_median_ms", zkReadLatencyStats.medianDimensionMs); + dMetrics.put("zk_latency_read_95percentile_ms", zkReadLatencyStats.dimension95Ms); + dMetrics.put("zk_latency_read_99_percentile_ms", zkReadLatencyStats.dimension99Ms); + dMetrics.put("zk_latency_read_99_9_percentile_ms", zkReadLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_read_99_99_percentile_ms", zkReadLatencyStats.dimension999Ms); + dMetrics.put("zk_latency_read_count", zkReadLatencyStats.dimensionCounts); + dMetrics.put("zk_write_rate", metaStore.getAndResetNumOfWrite()); dMetrics.put("zk_read_rate", metaStore.getAndResetNumOfRead()); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java index f146c1a6f47dd..52504f4d8d03e 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/OwnershipCacheTest.java @@ -28,8 +28,9 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper; import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -59,6 +60,8 @@ public class OwnershipCacheTest { private NamespaceBundleFactory bundleFactory; private NamespaceService nsService; private BrokerService brokerService; + private ManagedLedgerFactoryImpl ledgerFactory; + private MetaStoreImplZookeeper metaStore; private OrderedSafeExecutor executor; @BeforeMethod @@ -67,12 +70,14 @@ public void setup() throws Exception { selfBrokerUrl = "tcp://localhost:" + port; pulsar = mock(PulsarService.class); config = mock(ServiceConfiguration.class); + ledgerFactory = mock(ManagedLedgerFactoryImpl.class); executor = new OrderedSafeExecutor(1, "test"); zkCache = new LocalZooKeeperCache(MockZooKeeper.newInstance(), executor); localCache = new LocalZooKeeperCacheService(zkCache, null); bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32()); nsService = mock(NamespaceService.class); brokerService = mock(BrokerService.class); + metaStore = new MetaStoreImplZookeeper(zkCache.getZooKeeper(), executor); doReturn(CompletableFuture.completedFuture(1)).when(brokerService).unloadServiceUnit(anyObject()); doReturn(zkCache).when(pulsar).getLocalZkCache(); @@ -83,6 +88,8 @@ public void setup() throws Exception { doReturn(brokerService).when(pulsar).getBrokerService(); doReturn(webAddress(config)).when(pulsar).getWebServiceAddress(); doReturn(selfBrokerUrl).when(pulsar).getBrokerServiceUrl(); + doReturn(ledgerFactory).when(pulsar).getManagedLedgerFactory(); + doReturn(metaStore).when(ledgerFactory).getMetaStore(); } @AfterMethod diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java index 059cc167020fb..8c07c8dce930d 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/BrokerServiceTest.java @@ -302,7 +302,7 @@ public void testBrokerStatsMetrics() throws Exception { consumer.close(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); JsonArray metrics = brokerStatsClient.getMetrics(); - assertEquals(metrics.size(), 4, metrics.toString()); + assertEquals(metrics.size(), 5, metrics.toString()); // these metrics seem to be arriving in different order at different times... // is the order really relevant here? diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java index f107a548b1bac..5e03ee189b889 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/stats/ManagedLedgerMetricsTest.java @@ -93,7 +93,8 @@ public void testZkOpStatsMetrics() throws Exception { pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1"); Metrics zkOpMetric = getMetric("zk_op_stats"); Assert.assertNotNull(zkOpMetric); - Assert.assertTrue((double) zkOpMetric.getMetrics().get("zk_latenc_99_99_percentile_ms") > 0); + Assert.assertTrue(zkOpMetric.getMetrics().containsKey("zk_latency_write_99_99_percentile_ms")); + Assert.assertTrue(zkOpMetric.getMetrics().containsKey("zk_latency_read_99_99_percentile_ms")); Assert.assertTrue((long) zkOpMetric.getMetrics().get("zk_read_rate") > 0); Assert.assertTrue((long) zkOpMetric.getMetrics().get("zk_write_rate") > 0);