From b6c280a40505ee0c8d2f0dfa6e7481ae2e9f0b87 Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Thu, 31 Aug 2023 15:12:03 +0800 Subject: [PATCH] cherry-pick from #21059 --- .../PulsarLedgerUnderreplicationManager.java | 29 ++++++++ .../LedgerUnderreplicationManagerTest.java | 66 +++++++++++++++++++ 2 files changed, 95 insertions(+) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java index 79fdc44cb2b064..dda8d7256ed52f 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarLedgerUnderreplicationManager.java @@ -25,10 +25,12 @@ import static org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat; import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat; import static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat; +import com.google.common.base.Joiner; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.TextFormat; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; @@ -61,6 +63,8 @@ import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.CreateOption; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; +import org.apache.pulsar.metadata.impl.ZKMetadataStore; +import org.apache.zookeeper.KeeperException; @Slf4j public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicationManager { @@ -393,6 +397,31 @@ public void markLedgerReplicated(long ledgerId) throws ReplicationException.Unav Lock l = heldLocks.get(ledgerId); if (l != null) { store.delete(getUrLedgerPath(ledgerId), Optional.of(l.getLedgerNodeVersion())).get(); + if (store instanceof ZKMetadataStore) { + try { + // clean up the hierarchy + String[] parts = getUrLedgerPath(ledgerId).split("/"); + for (int i = 1; i <= 4; i++) { + String[] p = Arrays.copyOf(parts, parts.length - i); + String path = Joiner.on("/").join(p); + Optional getResult = store.get(path).get(); + if (getResult.isPresent()) { + store.delete(path, Optional.of(getResult.get().getStat().getVersion())).get(); + } + } + } catch (ExecutionException ee) { + // This can happen when cleaning up the hierarchy. + // It's safe to ignore, it simply means another + // ledger in the same hierarchy has been marked as + // underreplicated. + if (ee.getCause() instanceof MetadataStoreException && ee.getCause().getCause() + instanceof KeeperException.NotEmptyException) { + //do nothing. + } else { + log.warn("Error deleting underrepcalited ledger parent node", ee); + } + } + } } } catch (ExecutionException ee) { if (ee.getCause() instanceof MetadataStoreException.NotFoundException) { diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java index 0df325b3c57a06..649dc1663c68f3 100644 --- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java +++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/LedgerUnderreplicationManagerTest.java @@ -23,12 +23,14 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import com.google.protobuf.TextFormat; +import java.lang.reflect.Field; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -54,6 +56,7 @@ import org.apache.bookkeeper.util.BookKeeperConstants; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.metadata.BaseMetadataStoreTest; +import org.apache.pulsar.metadata.api.GetResult; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -296,6 +299,69 @@ public void testMarkingAsReplicated(String provider, Supplier urlSupplie assertEquals(l, lB.get(), "Should be the ledger I marked"); } + + @Test(timeOut = 10000) + public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes() throws Exception { + methodSetup(stringSupplier(() -> zks.getConnectionString())); + + String missingReplica = "localhost:3181"; + + @Cleanup + LedgerUnderreplicationManager m1 = lmf.newLedgerUnderreplicationManager(); + + Long ledgerA = 0xfeadeefdacL; + m1.markLedgerUnderreplicated(ledgerA, missingReplica); + + Field storeField = m1.getClass().getDeclaredField("store"); + storeField.setAccessible(true); + MetadataStoreExtended metadataStore = (MetadataStoreExtended) storeField.get(m1); + + String fiveLevelPath = PulsarLedgerUnderreplicationManager.getUrLedgerPath(urLedgerPath, ledgerA); + Optional getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + String fourLevelPath = fiveLevelPath.substring(0, fiveLevelPath.lastIndexOf("/")); + getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + String threeLevelPath = fourLevelPath.substring(0, fourLevelPath.lastIndexOf("/")); + getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + String twoLevelPath = fourLevelPath.substring(0, threeLevelPath.lastIndexOf("/")); + getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + String oneLevelPath = fourLevelPath.substring(0, twoLevelPath.lastIndexOf("/")); + getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + + long ledgerToRereplicate = m1.getLedgerToRereplicate(); + assertEquals(ledgerToRereplicate, ledgerA); + m1.markLedgerReplicated(ledgerA); + + getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS); + assertFalse(getResult.isPresent()); + + getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS); + assertTrue(getResult.isPresent()); + } + /** * Test releasing of a ledger * A ledger is released when a client decides it does not want