Skip to content

Commit

Permalink
cherry-pick from apache#21059
Browse files Browse the repository at this point in the history
  • Loading branch information
horizonzy committed Sep 11, 2023
1 parent 15b7508 commit b6c280a
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import static org.apache.bookkeeper.proto.DataFormats.PlacementPolicyCheckFormat;
import static org.apache.bookkeeper.proto.DataFormats.ReplicasCheckFormat;
import static org.apache.bookkeeper.proto.DataFormats.UnderreplicatedLedgerFormat;
import com.google.common.base.Joiner;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TextFormat;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
Expand Down Expand Up @@ -61,6 +63,8 @@
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.KeeperException;

@Slf4j
public class PulsarLedgerUnderreplicationManager implements LedgerUnderreplicationManager {
Expand Down Expand Up @@ -393,6 +397,31 @@ public void markLedgerReplicated(long ledgerId) throws ReplicationException.Unav
Lock l = heldLocks.get(ledgerId);
if (l != null) {
store.delete(getUrLedgerPath(ledgerId), Optional.of(l.getLedgerNodeVersion())).get();
if (store instanceof ZKMetadataStore) {
try {
// clean up the hierarchy
String[] parts = getUrLedgerPath(ledgerId).split("/");
for (int i = 1; i <= 4; i++) {
String[] p = Arrays.copyOf(parts, parts.length - i);
String path = Joiner.on("/").join(p);
Optional<GetResult> getResult = store.get(path).get();
if (getResult.isPresent()) {
store.delete(path, Optional.of(getResult.get().getStat().getVersion())).get();
}
}
} catch (ExecutionException ee) {
// This can happen when cleaning up the hierarchy.
// It's safe to ignore, it simply means another
// ledger in the same hierarchy has been marked as
// underreplicated.
if (ee.getCause() instanceof MetadataStoreException && ee.getCause().getCause()
instanceof KeeperException.NotEmptyException) {
//do nothing.
} else {
log.warn("Error deleting underrepcalited ledger parent node", ee);
}
}
}
}
} catch (ExecutionException ee) {
if (ee.getCause() instanceof MetadataStoreException.NotFoundException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.protobuf.TextFormat;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand All @@ -54,6 +56,7 @@
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.metadata.BaseMetadataStoreTest;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -296,6 +299,69 @@ public void testMarkingAsReplicated(String provider, Supplier<String> urlSupplie
assertEquals(l, lB.get(), "Should be the ledger I marked");
}


@Test(timeOut = 10000)
public void testZkMetasStoreMarkReplicatedDeleteEmptyParentNodes() throws Exception {
methodSetup(stringSupplier(() -> zks.getConnectionString()));

String missingReplica = "localhost:3181";

@Cleanup
LedgerUnderreplicationManager m1 = lmf.newLedgerUnderreplicationManager();

Long ledgerA = 0xfeadeefdacL;
m1.markLedgerUnderreplicated(ledgerA, missingReplica);

Field storeField = m1.getClass().getDeclaredField("store");
storeField.setAccessible(true);
MetadataStoreExtended metadataStore = (MetadataStoreExtended) storeField.get(m1);

String fiveLevelPath = PulsarLedgerUnderreplicationManager.getUrLedgerPath(urLedgerPath, ledgerA);
Optional<GetResult> getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS);
assertTrue(getResult.isPresent());

String fourLevelPath = fiveLevelPath.substring(0, fiveLevelPath.lastIndexOf("/"));
getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS);
assertTrue(getResult.isPresent());

String threeLevelPath = fourLevelPath.substring(0, fourLevelPath.lastIndexOf("/"));
getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS);
assertTrue(getResult.isPresent());

String twoLevelPath = fourLevelPath.substring(0, threeLevelPath.lastIndexOf("/"));
getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS);
assertTrue(getResult.isPresent());

String oneLevelPath = fourLevelPath.substring(0, twoLevelPath.lastIndexOf("/"));
getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS);
assertTrue(getResult.isPresent());

getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS);
assertTrue(getResult.isPresent());

long ledgerToRereplicate = m1.getLedgerToRereplicate();
assertEquals(ledgerToRereplicate, ledgerA);
m1.markLedgerReplicated(ledgerA);

getResult = metadataStore.get(fiveLevelPath).get(1, TimeUnit.SECONDS);
assertFalse(getResult.isPresent());

getResult = metadataStore.get(fourLevelPath).get(1, TimeUnit.SECONDS);
assertFalse(getResult.isPresent());

getResult = metadataStore.get(threeLevelPath).get(1, TimeUnit.SECONDS);
assertFalse(getResult.isPresent());

getResult = metadataStore.get(twoLevelPath).get(1, TimeUnit.SECONDS);
assertFalse(getResult.isPresent());

getResult = metadataStore.get(oneLevelPath).get(1, TimeUnit.SECONDS);
assertFalse(getResult.isPresent());

getResult = metadataStore.get(urLedgerPath).get(1, TimeUnit.SECONDS);
assertTrue(getResult.isPresent());
}

/**
* Test releasing of a ledger
* A ledger is released when a client decides it does not want
Expand Down

0 comments on commit b6c280a

Please sign in to comment.