Skip to content

Commit

Permalink
[fix][broker] Immediately tombstone Deleted and Free state bundles (a…
Browse files Browse the repository at this point in the history
…pache#22743)

(cherry picked from commit 949260f)
  • Loading branch information
heesung-sn committed Jun 26, 2024
1 parent f467f37 commit d982d3b
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ public CompletableFuture<Optional<BrokerLookupData>> getOwnershipWithLookupDataA
}

public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
Optional<String> destinationBroker) {
Optional<String> destinationBroker,
boolean force) {
if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) {
log.info("Skip unloading namespace bundle: {}.", bundle);
return CompletableFuture.completedFuture(null);
Expand All @@ -642,7 +643,7 @@ public CompletableFuture<Void> unloadNamespaceBundleAsync(ServiceUnitId bundle,
log.warn(msg);
throw new IllegalArgumentException(msg);
}
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true);
Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, force);
UnloadDecision unloadDecision =
new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin);
return unloadAsync(unloadDecision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,9 +826,12 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
}

if (isTargetBroker(data.sourceBroker())) {
stateChangeListeners.notifyOnCompletion(
data.force() ? closeServiceUnit(serviceUnit)
: CompletableFuture.completedFuture(0), serviceUnit, data)
// If data.force(), try closeServiceUnit and tombstone the bundle.
CompletableFuture<Void> future =
(data.force() ? closeServiceUnit(serviceUnit)
.thenCompose(__ -> tombstoneAsync(serviceUnit))
: CompletableFuture.completedFuture(0)).thenApply(__ -> null);
stateChangeListeners.notifyOnCompletion(future, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
Expand All @@ -840,9 +843,13 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted."));
}
stateChangeListeners.notify(serviceUnit, data, null);

if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
stateChangeListeners.notifyOnCompletion(
tombstoneAsync(serviceUnit), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
return;
}
switch (state) {
case Deleted, Owned, Init -> this.complete(serviceUnit, t);
case Init -> this.complete(serviceUnit, t);
default -> {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public CompletableFuture<Void> waitAsync(CompletableFuture<Void> eventPubFuture,
public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) {
ServiceUnitState state = ServiceUnitStateData.state(data);

if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) {
if ((state == Owned || state == Assigning) && StringUtils.isBlank(data.sourceBroker())) {
if (log.isDebugEnabled()) {
log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit);
}
Expand All @@ -113,7 +113,17 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable
}

switch (state) {
case Free, Owned -> this.complete(serviceUnit, t);
case Free -> {
if (!data.force()) {
complete(serviceUnit, t);
}
}
case Init -> {
if (data.force()) {
complete(serviceUnit, t);
}
}
case Owned -> complete(serviceUnit, t);
default -> {
if (log.isDebugEnabled()) {
log.debug("Handling {} for service unit {}", data, serviceUnit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
boolean closeWithoutWaitingClientDisconnect) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return ExtensibleLoadManagerImpl.get(loadManager.get())
.unloadNamespaceBundleAsync(bundle, destinationBroker);
.unloadNamespaceBundleAsync(bundle, destinationBroker, false);
}
// unload namespace bundle
OwnedBundle ob = ownershipCache.getOwnedBundle(bundle);
Expand Down Expand Up @@ -1232,7 +1232,8 @@ public CompletableFuture<Void> removeOwnedServiceUnitAsync(NamespaceBundle nsBun
CompletableFuture<Void> future;
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty());
future = extensibleLoadManager.unloadNamespaceBundleAsync(
nsBundle, Optional.empty(), true);
} else {
future = ownershipCache.removeOwnership(nsBundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,9 +359,12 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle)

@Test(timeOut = 30 * 1000)
public void testSplitBundleAdminAPI() throws Exception {
String namespace = defaultTestNamespace;
String topic = "persistent://" + namespace + "/test-split";
admin.topics().createPartitionedTopic(topic, 10);

final String namespace = "public/testSplitBundleAdminAPI";
admin.namespaces().createNamespace(namespace, 1);
Pair<TopicName, NamespaceBundle> topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split");
TopicName topicName = topicAndBundle.getLeft();
admin.topics().createPartitionedTopic(topicName.toString(), 10);
BundlesData bundles = admin.namespaces().getBundles(namespace);
int numBundles = bundles.getNumBundles();
var bundleRanges = bundles.getBoundaries().stream().map(Long::decode).sorted().toList();
Expand All @@ -388,16 +391,19 @@ public boolean test(NamespaceBundle namespaceBundle) {
long mid = bundleRanges.get(0) + (bundleRanges.get(1) - bundleRanges.get(0)) / 2;

admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null);

BundlesData bundlesData = admin.namespaces().getBundles(namespace);
assertEquals(bundlesData.getNumBundles(), numBundles + 1);
String lowBundle = String.format("0x%08x", bundleRanges.get(0));
String midBundle = String.format("0x%08x", mid);
String highBundle = String.format("0x%08x", bundleRanges.get(1));
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
assertEquals(splitCount.get(), 1);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
assertEquals(bundlesData.getNumBundles(), numBundles + 1);
String lowBundle = String.format("0x%08x", bundleRanges.get(0));
String midBundle = String.format("0x%08x", mid);
String highBundle = String.format("0x%08x", bundleRanges.get(1));
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
assertEquals(splitCount.get(), 1);
});

// Test split bundle with invalid bundle range.
try {
Expand All @@ -406,6 +412,30 @@ public boolean test(NamespaceBundle namespaceBundle) {
} catch (PulsarAdminException ex) {
assertTrue(ex.getMessage().contains("Invalid bundle range"));
}


// delete and retry
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
admin.namespaces().deleteNamespace(namespace);
});
admin.namespaces().createNamespace(namespace, 1);
admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null);

Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
BundlesData bundlesData = admin.namespaces().getBundles(namespace);
assertEquals(bundlesData.getNumBundles(), numBundles + 1);
String lowBundle = String.format("0x%08x", bundleRanges.get(0));
String midBundle = String.format("0x%08x", mid);
String highBundle = String.format("0x%08x", bundleRanges.get(1));
assertTrue(bundlesData.getBoundaries().contains(lowBundle));
assertTrue(bundlesData.getBoundaries().contains(midBundle));
assertTrue(bundlesData.getBoundaries().contains(highBundle));
assertEquals(splitCount.get(), 2);
});
}

@Test(timeOut = 30 * 1000)
Expand Down Expand Up @@ -1221,7 +1251,11 @@ public void testTryAcquiringOwnership()
NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get();
assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl())
.contains(namespaceEphemeralData.getNativeUrl()));
admin.namespaces().deleteNamespace(namespace);
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
admin.namespaces().deleteNamespace(namespace, true);
});
}

@Test(timeOut = 30 * 1000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,11 +573,11 @@ public void splitAndRetryTest() throws Exception {
childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty()));
channel1.publishSplitEventAsync(split);

waitUntilState(channel1, bundle, Deleted);
waitUntilState(channel2, bundle, Deleted);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);
// Verify the retry count
Expand Down Expand Up @@ -617,7 +617,7 @@ public void splitAndRetryTest() throws Exception {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
validateMonitorCounters(leader,
0,
1,
0,
0,
0,
0,
Expand Down Expand Up @@ -1233,15 +1233,15 @@ public void splitTestWhenProducerFails()

var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;

waitUntilStateWithMonitor(leader, bundle, Deleted);
waitUntilStateWithMonitor(channel1, bundle, Deleted);
waitUntilStateWithMonitor(channel2, bundle, Deleted);
waitUntilStateWithMonitor(leader, bundle, Init);
waitUntilStateWithMonitor(channel1, bundle, Init);
waitUntilStateWithMonitor(channel2, bundle, Init);

var ownerAddr1 = channel1.getOwnerAsync(bundle);
var ownerAddr2 = channel2.getOwnerAsync(bundle);

assertTrue(ownerAddr1.isCompletedExceptionally());
assertTrue(ownerAddr2.isCompletedExceptionally());
assertTrue(ownerAddr1.get().isEmpty());
assertTrue(ownerAddr2.get().isEmpty());


FieldUtils.writeDeclaredField(channel1,
Expand Down Expand Up @@ -1425,13 +1425,15 @@ public void splitAndRetryFailureTest() throws Exception {
var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2;
((ServiceUnitStateChannelImpl) leader)
.monitorOwnerships(List.of(brokerId1, brokerId2));
waitUntilState(leader, bundle3, Deleted);
waitUntilState(channel1, bundle3, Deleted);
waitUntilState(channel2, bundle3, Deleted);

waitUntilState(leader, bundle3, Init);
waitUntilState(channel1, bundle3, Init);
waitUntilState(channel2, bundle3, Init);


validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0);

validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 1, 0);
validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 1, 0);
validateEventCounters(channel1, 1, 0, 1, 0, 0, 0);
validateEventCounters(channel2, 0, 0, 0, 0, 0, 0);

Expand Down Expand Up @@ -1461,7 +1463,7 @@ public void splitAndRetryFailureTest() throws Exception {

validateMonitorCounters(leader,
0,
1,
0,
1,
0,
0,
Expand Down Expand Up @@ -1539,7 +1541,7 @@ public void testOverrideInactiveBrokerStateData()
waitUntilNewOwner(channel2, ownedBundle, brokerId2);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());

// clean-up
FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true);
Expand Down Expand Up @@ -1602,7 +1604,7 @@ public void testOverrideOrphanStateData()
waitUntilNewOwner(channel2, ownedBundle, broker);
assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get());
assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally());
assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty());

// clean-up
FieldUtils.writeDeclaredField(channel1,
Expand Down Expand Up @@ -1660,8 +1662,10 @@ public void testActiveGetOwner() throws Exception {
"inFlightStateWaitingTimeInMillis", 20 * 1000, true);
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
assertTrue(System.currentTimeMillis() - start < 20_000);
waitUntilState(channel1, bundle, Init);
waitUntilState(channel2, bundle, Init);

assertTrue(System.currentTimeMillis() - start < 20_000);
// simulate ownership cleanup(brokerId1 selected owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,40 +123,23 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());

manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(inFlightUnloadRequests.size(), 0);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
assertEquals(inFlightUnloadRequests.size(), 1);

// Success with Init state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, decision, 5, TimeUnit.SECONDS);
inFlightUnloadRequests = getinFlightUnloadRequests(manager);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 1);

// Success with Init state.
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
future.get();

// Success with Owned state.
future = manager.waitAsync(CompletableFuture.completedFuture(null),
bundle, decision, 5, TimeUnit.SECONDS);
inFlightUnloadRequests = getinFlightUnloadRequests(manager);
assertEquals(inFlightUnloadRequests.size(), 1);
manager.handleEvent(bundle,
new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null);
assertEquals(inFlightUnloadRequests.size(), 0);
counterExpected.update(SplitDecision.Label.Success, Sessions);
assertEquals(counter.toMetrics(null).toString(),
counterExpected.toMetrics(null).toString());
future.get();
}

Expand Down
Loading

0 comments on commit d982d3b

Please sign in to comment.