diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java index b7f0e0a1bc1e51..773f3944fbcd74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java @@ -19,6 +19,8 @@ package org.apache.pulsar.broker.delayed.bucket; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELAYED_BUCKET_KEY_PREFIX; +import static org.apache.pulsar.broker.delayed.bucket.Bucket.DELIMITER; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Range; import com.google.common.collect.RangeMap; @@ -27,10 +29,13 @@ import io.netty.util.Timeout; import io.netty.util.Timer; import java.time.Clock; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.NavigableSet; import java.util.Optional; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -39,11 +44,13 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue; @Slf4j @@ -91,9 +98,66 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat this.sharedBucketPriorityQueue = new TripleLongPriorityQueue(); this.immutableBuckets = TreeRangeMap.create(); this.snapshotSegmentLastIndexTable = HashBasedTable.create(); - this.numberDelayedMessages = 0L; ManagedCursor cursor = dispatcher.getCursor(); this.lastMutableBucket = new MutableBucket(cursor, bucketSnapshotStorage); + this.numberDelayedMessages = recoverBucketSnapshot(); + } + + private long recoverBucketSnapshot() throws RuntimeException { + ManagedCursor cursor = this.lastMutableBucket.cursor; + cursor.getCursorProperties().keySet().forEach(key -> { + if (key.startsWith(DELAYED_BUCKET_KEY_PREFIX)) { + String[] keys = key.split(DELIMITER); + checkArgument(keys.length == 3); + ImmutableBucket immutableBucket = + new ImmutableBucket(cursor, this.lastMutableBucket.bucketSnapshotStorage, + Long.parseLong(keys[1]), Long.parseLong(keys[2])); + immutableBuckets.put(Range.closed(immutableBucket.startLedgerId, immutableBucket.endLedgerId), + immutableBucket); + } + }); + + if (immutableBuckets.asMapOfRanges().isEmpty()) { + return 0; + } + + List> futures = new ArrayList<>(immutableBuckets.asMapOfRanges().size()); + for (ImmutableBucket immutableBucket : immutableBuckets.asMapOfRanges().values()) { + CompletableFuture future = + immutableBucket.asyncRecoverBucketSnapshotEntry(this::getCutoffTime).thenAccept(indexList -> { + if (CollectionUtils.isEmpty(indexList)) { + return; + } + DelayedIndex lastDelayedIndex = indexList.get(indexList.size() - 1); + synchronized (this.snapshotSegmentLastIndexTable) { + this.snapshotSegmentLastIndexTable.put(lastDelayedIndex.getLedgerId(), + lastDelayedIndex.getEntryId(), immutableBucket); + } + synchronized (this.sharedBucketPriorityQueue) { + for (DelayedIndex index : indexList) { + this.sharedBucketPriorityQueue.add(index.getTimestamp(), index.getLedgerId(), + index.getEntryId()); + } + } + }); + futures.add(future); + } + + try { + FutureUtil.waitForAll(futures).get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + + MutableLong numberDelayedMessages = new MutableLong(0); + immutableBuckets.asMapOfRanges().values().forEach(bucket -> { + numberDelayedMessages.add(bucket.numberBucketDelayedMessages); + }); + + log.info("[{}] Recover delayed message index bucket snapshot finish, buckets: {}, numberDelayedMessages: {}", + dispatcher.getName(), immutableBuckets.asMapOfRanges().size(), numberDelayedMessages.getValue()); + + return numberDelayedMessages.getValue(); } @Override @@ -242,7 +306,7 @@ public synchronized NavigableSet getScheduledMessages(int maxMessa // All message of current snapshot segment are scheduled, load next snapshot segment // TODO make it asynchronous and not blocking this process try { - bucket.asyncLoadNextBucketSnapshotEntry(false).thenAccept(indexList -> { + bucket.asyncLoadNextBucketSnapshotEntry().thenAccept(indexList -> { if (CollectionUtils.isEmpty(indexList)) { return; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java index 833030c5751721..1ef4a063716cfc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/ImmutableBucket.java @@ -19,15 +19,22 @@ package org.apache.pulsar.broker.delayed.bucket; import static org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker.AsyncOperationTimeoutSeconds; +import com.google.protobuf.ByteString; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.mutable.MutableLong; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat; import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.DelayedIndex; +import org.apache.pulsar.broker.delayed.proto.DelayedMessageIndexBucketSnapshotFormat.SnapshotSegmentMetadata; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @Slf4j class ImmutableBucket extends Bucket { @@ -35,12 +42,16 @@ class ImmutableBucket extends Bucket { super(cursor, storage, startLedgerId, endLedgerId); } - /** - * Asynchronous load next bucket snapshot entry. - * @param isRecover whether used to recover bucket snapshot - * @return CompletableFuture - */ - CompletableFuture> asyncLoadNextBucketSnapshotEntry(boolean isRecover) { + CompletableFuture> asyncLoadNextBucketSnapshotEntry() { + return asyncLoadNextBucketSnapshotEntry(false, null); + } + + CompletableFuture> asyncRecoverBucketSnapshotEntry(Supplier cutoffTimeSupplier) { + return asyncLoadNextBucketSnapshotEntry(true, cutoffTimeSupplier); + } + + private CompletableFuture> asyncLoadNextBucketSnapshotEntry(boolean isRecover, + Supplier cutoffTimeSupplier) { if (log.isDebugEnabled()) { log.debug("[{}] Load next bucket snapshot data, bucket: {}", cursor.getName(), this); } @@ -54,7 +65,28 @@ CompletableFuture> asyncLoadNextBucketSnapshotEntry(boolean i final long bucketId = getAndUpdateBucketId(); CompletableFuture loadMetaDataFuture = new CompletableFuture<>(); if (isRecover) { - // TODO Recover bucket snapshot + final long cutoffTime = cutoffTimeSupplier.get(); + // Load Metadata of bucket snapshot + bucketSnapshotStorage.getBucketSnapshotMetadata(bucketId).thenAccept(snapshotMetadata -> { + List metadataList = + snapshotMetadata.getMetadataListList(); + + // Skip all already reach schedule time snapshot segments + int nextSnapshotEntryIndex = 0; + while (nextSnapshotEntryIndex < metadataList.size() + && metadataList.get(nextSnapshotEntryIndex).getMaxScheduleTimestamp() <= cutoffTime) { + nextSnapshotEntryIndex++; + } + + this.setLastSegmentEntryId(metadataList.size()); + this.recoverDelayedIndexBitMapAndNumber(nextSnapshotEntryIndex, metadataList); + + int nextSegmentEntryId = nextSnapshotEntryIndex + 1; + loadMetaDataFuture.complete(nextSegmentEntryId); + }).exceptionally(ex -> { + loadMetaDataFuture.completeExceptionally(ex); + return null; + }); } else { loadMetaDataFuture.complete(currentSegmentEntryId + 1); } @@ -82,6 +114,27 @@ CompletableFuture> asyncLoadNextBucketSnapshotEntry(boolean i }); } + private void recoverDelayedIndexBitMapAndNumber(int startSnapshotIndex, + List segmentMetadata) { + this.delayedIndexBitMap.clear(); + MutableLong numberMessages = new MutableLong(0); + for (int i = startSnapshotIndex; i < segmentMetadata.size(); i++) { + Map bitByteStringMap = segmentMetadata.get(i).getDelayedIndexBitMapMap(); + bitByteStringMap.forEach((leaderId, bitSetString) -> { + boolean exist = this.delayedIndexBitMap.containsKey(leaderId); + RoaringBitmap bitSet = + new ImmutableRoaringBitmap(bitSetString.asReadOnlyByteBuffer()).toRoaringBitmap(); + numberMessages.add(bitSet.getCardinality()); + if (!exist) { + this.delayedIndexBitMap.put(leaderId, bitSet); + } else { + this.delayedIndexBitMap.get(leaderId).or(bitSet); + } + }); + } + this.setNumberBucketDelayedMessages(numberMessages.getValue()); + } + void clear(boolean delete) { delayedIndexBitMap.clear(); getSnapshotCreateFuture().ifPresent(snapshotGenerateFuture -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java index 331ceb83a99943..abcde7902d84e8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerTest.java @@ -24,12 +24,17 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertFalse; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.lang.reflect.Method; +import java.nio.ByteBuffer; import java.time.Clock; +import java.util.Arrays; +import java.util.List; import java.util.NavigableMap; import java.util.Set; import java.util.TreeMap; @@ -40,6 +45,8 @@ import org.apache.pulsar.broker.delayed.bucket.BucketDelayedDeliveryTracker; import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.testng.annotations.AfterMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -156,4 +163,76 @@ public void testContainsMessage(DelayedDeliveryTracker tracker) { tracker.close(); } + + @Test(dataProvider = "delayedTracker", invocationCount = 10) + public void testRecoverSnapshot(BucketDelayedDeliveryTracker tracker) { + for (int i = 1; i <= 100; i++) { + tracker.addMessage(i, i, i * 10); + } + + assertEquals(tracker.getNumberOfDelayedMessages(), 100); + + clockTime.set(1 * 10); + + assertTrue(tracker.hasMessageAvailable()); + Set scheduledMessages = tracker.getScheduledMessages(100); + + assertEquals(scheduledMessages.size(), 1); + + tracker.addMessage(101, 101, 101 * 10); + + tracker.close(); + + clockTime.set(30 * 10); + + tracker = new BucketDelayedDeliveryTracker(dispatcher, timer, 1000, clock, + true, bucketSnapshotStorage, 5, TimeUnit.MILLISECONDS.toMillis(10), 50); + + assertFalse(tracker.containsMessage(101, 101)); + assertEquals(tracker.getNumberOfDelayedMessages(), 70); + + clockTime.set(100 * 10); + + assertTrue(tracker.hasMessageAvailable()); + scheduledMessages = tracker.getScheduledMessages(70); + + assertEquals(scheduledMessages.size(), 70); + + int i = 31; + for (PositionImpl scheduledMessage : scheduledMessages) { + assertEquals(scheduledMessage, PositionImpl.get(i, i)); + i++; + } + + tracker.close(); + } + + @Test + public void testRoaringBitmapSerialize() { + List data = List.of(1L, 3L, 5L, 10L, 16L, 18L, 999L, 0L); + RoaringBitmap roaringBitmap = new RoaringBitmap(); + for (Long datum : data) { + roaringBitmap.add(datum, datum + 1); + } + + assertEquals(roaringBitmap.getCardinality(), data.size()); + for (Long datum : data) { + assertTrue(roaringBitmap.contains(datum, datum + 1)); + } + + byte[] array = new byte[roaringBitmap.serializedSizeInBytes()]; + roaringBitmap.serialize(ByteBuffer.wrap(array)); + + RoaringBitmap roaringBitmap2 = new ImmutableRoaringBitmap(ByteBuffer.wrap(array)).toRoaringBitmap(); + assertEquals(roaringBitmap2.getCardinality(), data.size()); + for (Long datum : data) { + assertTrue(roaringBitmap2.contains(datum, datum + 1)); + } + + byte[] array2 = new byte[roaringBitmap2.serializedSizeInBytes()]; + roaringBitmap.serialize(ByteBuffer.wrap(array2)); + + assertTrue(Arrays.equals(array, array2)); + assertNotSame(array, array2); + } }