Skip to content

Commit

Permalink
Add metrics for writing or reading size of cursor (apache#11500)
Browse files Browse the repository at this point in the history
### Motivation
Currently, there is no visibility about the following activities:
- How many bytes are written from a cursor update?
- How many bytes are read from loading a cursor?
So when the bookkeeper cluster is having heavy traffic, it is hard to tell which topic or namespace contributes most of the traffic.

Add metrics at the broker about how many bytes are written and read per cursor/namespace.

### Modifications
Add metrics `writeLedgerSize`, `writeLedgerLogicalSize`, `readLedgerSize`.
  • Loading branch information
Technoboy- authored and ciaocloud committed Oct 16, 2021
1 parent 3bcdf5c commit bb96498
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ public interface ManagedCursorMXBean {
String getLedgerName();

/**
* persist cursor by ledger
* persist cursor by ledger.
* @param success
*/
void persistToLedger(boolean success);

/**
* persist cursor by zookeeper
* persist cursor by zookeeper.
* @param success
*/
void persistToZookeeper(boolean success);
Expand All @@ -70,4 +70,34 @@ public interface ManagedCursorMXBean {
*/
long getPersistZookeeperErrors();

/**
* Add write data to a ledger of a cursor (in bytes).
* This will update writeCursorLedgerLogicalSize and writeCursorLedgerSize.
*
* @param size Size of data written to cursor (in bytes)
*/
void addWriteCursorLedgerSize(long size);

/**
* Add read data from a ledger of a cursor (in bytes).
*
* @param size Size of data read from cursor (in bytes)
*/
void addReadCursorLedgerSize(long size);

/**
* @return the size of data written to cursor (in bytes)
*/
long getWriteCursorLedgerSize();

/**
* @return the size of data written to cursor without replicas (in bytes)
*/
long getWriteCursorLedgerLogicalSize();

/**
* @return the size of data read from cursor (in bytes)
*/
long getReadCursorLedgerSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
}

LedgerEntry entry = seq.nextElement();
mbean.addReadCursorLedgerSize(entry.getLength());
PositionInfo positionInfo;
try {
positionInfo = PositionInfo.parseFrom(entry.getEntry());
Expand Down Expand Up @@ -2599,7 +2600,8 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

checkNotNull(lh);
lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
byte[] data = pi.toByteArray();
lh.asyncAddEntry(data, (rc, lh1, entryId, ctx) -> {
if (rc == BKException.Code.OK) {
if (log.isDebugEnabled()) {
log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", ledger.getName(), name, position,
Expand All @@ -2614,6 +2616,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}

mbean.persistToLedger(true);
mbean.addWriteCursorLedgerSize(data.length);
callback.operationComplete();
} else {
log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", ledger.getName(), name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public class ManagedCursorMXBeanImpl implements ManagedCursorMXBean {
private final LongAdder persistZookeeperSucceed = new LongAdder();
private final LongAdder persistZookeeperFailed = new LongAdder();

private final LongAdder writeCursorLedgerSize = new LongAdder();
private final LongAdder writeCursorLedgerLogicalSize = new LongAdder();
private final LongAdder readCursorLedgerSize = new LongAdder();

private final ManagedCursor managedCursor;

public ManagedCursorMXBeanImpl(ManagedCursor managedCursor) {
Expand Down Expand Up @@ -83,4 +87,30 @@ public long getPersistZookeeperSucceed() {
public long getPersistZookeeperErrors() {
return persistZookeeperFailed.longValue();
}

@Override
public void addWriteCursorLedgerSize(final long size) {
writeCursorLedgerSize.add(size * ((ManagedCursorImpl) managedCursor).config.getWriteQuorumSize());
writeCursorLedgerLogicalSize.add(size);
}

@Override
public void addReadCursorLedgerSize(final long size) {
readCursorLedgerSize.add(size);
}

@Override
public long getWriteCursorLedgerSize() {
return writeCursorLedgerSize.longValue();
}

@Override
public long getWriteCursorLedgerLogicalSize() {
return writeCursorLedgerLogicalSize.longValue();
}

@Override
public long getReadCursorLedgerSize() {
return readCursorLedgerSize.longValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ private List<Metrics> aggregate() {
metrics.put("brk_ml_cursor_persistLedgerErrors", cStats.getPersistLedgerErrors());
metrics.put("brk_ml_cursor_persistZookeeperSucceed", cStats.getPersistZookeeperSucceed());
metrics.put("brk_ml_cursor_persistZookeeperErrors", cStats.getPersistZookeeperErrors());
metrics.put("brk_ml_cursor_writeLedgerSize", cStats.getWriteCursorLedgerSize());
metrics.put("brk_ml_cursor_writeLedgerLogicalSize", cStats.getWriteCursorLedgerLogicalSize());
metrics.put("brk_ml_cursor_readLedgerSize", cStats.getReadCursorLedgerSize());
metricsCollection.add(metrics);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.stats;

import lombok.Cleanup;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.stats.metrics.ManagedCursorMetrics;
Expand Down Expand Up @@ -90,4 +91,66 @@ public void testManagedCursorMetrics() throws Exception {
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_nonContiguousDeletedMessagesRange"), 0L);
}

@Test
public void testCursorReadWriteMetrics() throws Exception {
final String subName = "read-write";
final String topicName = "persistent://my-namespace/use/my-ns/read-write";
final int messageSize = 10;

ManagedCursorMetrics metrics = new ManagedCursorMetrics(pulsar);

List<Metrics> metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

metricsList = metrics.generate();
Assert.assertTrue(metricsList.isEmpty());

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionName(subName + "-2")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();

for (PulsarMockLedgerHandle ledgerHandle : mockBookKeeper.getLedgerMap().values()) {
ledgerHandle.close();
}

for (int i = 0; i < messageSize; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
if (i % 2 == 0) {
consumer.acknowledge(consumer.receive().getMessageId());
} else {
consumer2.acknowledge(consumer.receive().getMessageId());
}
}
metricsList = metrics.generate();
Assert.assertEquals(metricsList.size(), 3);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(0).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 26L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 13L);
Assert.assertEquals(metricsList.get(1).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);

Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerSize"), 52L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_writeLedgerLogicalSize"), 26L);
Assert.assertEquals(metricsList.get(2).getMetrics().get("brk_ml_cursor_readLedgerSize"), 0L);
}
}
3 changes: 3 additions & 0 deletions site2/docs/reference-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ brk_ml_cursor_persistLedgerErrors(namespace="", ledger_name="", cursor_name:"")|
brk_ml_cursor_persistZookeeperSucceed(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of acknowledgment states that is persistent to ZooKeeper.
brk_ml_cursor_persistZookeeperErrors(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of ledger errors occurred when acknowledgment states fail to be persistent to ZooKeeper.
brk_ml_cursor_nonContiguousDeletedMessagesRange(namespace="", ledger_name="", cursor_name:"")|Gauge|The number of non-contiguous deleted messages ranges.
brk_ml_cursor_writeLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger.
brk_ml_cursor_writeLedgerLogicalSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of write to ledger (accounting for without replicas).
brk_ml_cursor_readLedgerSize(namespace="", ledger_name="", cursor_name:"")|Gauge|The size of read from ledger.
### LoadBalancing metrics
All the loadbalancing metrics are labelled with the following labels:
Expand Down

0 comments on commit bb96498

Please sign in to comment.