diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 855e8ccfcaaf37..fd4e94ba7774d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -623,7 +623,8 @@ public CompletableFuture> getOwnershipWithLookupDataA } public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, - Optional destinationBroker) { + Optional destinationBroker, + boolean force) { if (NamespaceService.isSLAOrHeartbeatNamespace(bundle.getNamespaceObject().toString())) { log.info("Skip unloading namespace bundle: {}.", bundle); return CompletableFuture.completedFuture(null); @@ -642,7 +643,7 @@ public CompletableFuture unloadNamespaceBundleAsync(ServiceUnitId bundle, log.warn(msg); throw new IllegalArgumentException(msg); } - Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, true); + Unload unload = new Unload(sourceBroker, bundle.toString(), destinationBroker, force); UnloadDecision unloadDecision = new UnloadDecision(unload, UnloadDecision.Label.Success, UnloadDecision.Reason.Admin); return unloadAsync(unloadDecision, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index e3c55f6784514b..d471f8c3842c7c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -826,9 +826,12 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { } if (isTargetBroker(data.sourceBroker())) { - stateChangeListeners.notifyOnCompletion( - data.force() ? closeServiceUnit(serviceUnit) - : CompletableFuture.completedFuture(0), serviceUnit, data) + // If data.force(), try closeServiceUnit and tombstone the bundle. + CompletableFuture future = + (data.force() ? closeServiceUnit(serviceUnit) + .thenCompose(__ -> tombstoneAsync(serviceUnit)) + : CompletableFuture.completedFuture(0)).thenApply(__ -> null); + stateChangeListeners.notifyOnCompletion(future, serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } else { stateChangeListeners.notify(serviceUnit, data, null); @@ -840,9 +843,13 @@ private void handleDeleteEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.completeExceptionally(new IllegalStateException(serviceUnit + "has been deleted.")); } - stateChangeListeners.notify(serviceUnit, data, null); + if (isTargetBroker(data.sourceBroker())) { - log(null, serviceUnit, data, null); + stateChangeListeners.notifyOnCompletion( + tombstoneAsync(serviceUnit), serviceUnit, data) + .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + } else { + stateChangeListeners.notify(serviceUnit, data, null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java index 71ebbc92a87db0..ac21e4c624163f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManager.java @@ -97,7 +97,7 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable return; } switch (state) { - case Deleted, Owned, Init -> this.complete(serviceUnit, t); + case Init -> this.complete(serviceUnit, t); default -> { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {}", data, serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java index bf9885b2a252ef..742b23dc2d2ec5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManager.java @@ -93,7 +93,7 @@ public CompletableFuture waitAsync(CompletableFuture eventPubFuture, public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable t) { ServiceUnitState state = ServiceUnitStateData.state(data); - if (StringUtils.isBlank(data.sourceBroker()) && (state == Owned || state == Assigning)) { + if ((state == Owned || state == Assigning) && StringUtils.isBlank(data.sourceBroker())) { if (log.isDebugEnabled()) { log.debug("Skipping {} for service unit {} from the assignment command.", data, serviceUnit); } @@ -113,7 +113,17 @@ public void handleEvent(String serviceUnit, ServiceUnitStateData data, Throwable } switch (state) { - case Free, Owned -> this.complete(serviceUnit, t); + case Free -> { + if (!data.force()) { + complete(serviceUnit, t); + } + } + case Init -> { + if (data.force()) { + complete(serviceUnit, t); + } + } + case Owned -> complete(serviceUnit, t); default -> { if (log.isDebugEnabled()) { log.debug("Handling {} for service unit {}", data, serviceUnit); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index 3b759f59070326..5814c6316dd0fd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -783,7 +783,7 @@ public CompletableFuture unloadNamespaceBundle(NamespaceBundle bundle, boolean closeWithoutWaitingClientDisconnect) { if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { return ExtensibleLoadManagerImpl.get(loadManager.get()) - .unloadNamespaceBundleAsync(bundle, destinationBroker); + .unloadNamespaceBundleAsync(bundle, destinationBroker, false); } // unload namespace bundle OwnedBundle ob = ownershipCache.getOwnedBundle(bundle); @@ -1232,7 +1232,8 @@ public CompletableFuture removeOwnedServiceUnitAsync(NamespaceBundle nsBun CompletableFuture future; if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get()); - future = extensibleLoadManager.unloadNamespaceBundleAsync(nsBundle, Optional.empty()); + future = extensibleLoadManager.unloadNamespaceBundleAsync( + nsBundle, Optional.empty(), true); } else { future = ownershipCache.removeOwnership(nsBundle); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index 2feaabe9f99fcd..f74da2a7022fca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -359,9 +359,12 @@ private void checkOwnershipState(String broker, NamespaceBundle bundle) @Test(timeOut = 30 * 1000) public void testSplitBundleAdminAPI() throws Exception { - String namespace = defaultTestNamespace; - String topic = "persistent://" + namespace + "/test-split"; - admin.topics().createPartitionedTopic(topic, 10); + + final String namespace = "public/testSplitBundleAdminAPI"; + admin.namespaces().createNamespace(namespace, 1); + Pair topicAndBundle = getBundleIsNotOwnByChangeEventTopic("test-split"); + TopicName topicName = topicAndBundle.getLeft(); + admin.topics().createPartitionedTopic(topicName.toString(), 10); BundlesData bundles = admin.namespaces().getBundles(namespace); int numBundles = bundles.getNumBundles(); var bundleRanges = bundles.getBoundaries().stream().map(Long::decode).sorted().toList(); @@ -388,16 +391,19 @@ public boolean test(NamespaceBundle namespaceBundle) { long mid = bundleRanges.get(0) + (bundleRanges.get(1) - bundleRanges.get(0)) / 2; admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null); - - BundlesData bundlesData = admin.namespaces().getBundles(namespace); - assertEquals(bundlesData.getNumBundles(), numBundles + 1); - String lowBundle = String.format("0x%08x", bundleRanges.get(0)); - String midBundle = String.format("0x%08x", mid); - String highBundle = String.format("0x%08x", bundleRanges.get(1)); - assertTrue(bundlesData.getBoundaries().contains(lowBundle)); - assertTrue(bundlesData.getBoundaries().contains(midBundle)); - assertTrue(bundlesData.getBoundaries().contains(highBundle)); - assertEquals(splitCount.get(), 1); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + assertEquals(bundlesData.getNumBundles(), numBundles + 1); + String lowBundle = String.format("0x%08x", bundleRanges.get(0)); + String midBundle = String.format("0x%08x", mid); + String highBundle = String.format("0x%08x", bundleRanges.get(1)); + assertTrue(bundlesData.getBoundaries().contains(lowBundle)); + assertTrue(bundlesData.getBoundaries().contains(midBundle)); + assertTrue(bundlesData.getBoundaries().contains(highBundle)); + assertEquals(splitCount.get(), 1); + }); // Test split bundle with invalid bundle range. try { @@ -406,6 +412,30 @@ public boolean test(NamespaceBundle namespaceBundle) { } catch (PulsarAdminException ex) { assertTrue(ex.getMessage().contains("Invalid bundle range")); } + + + // delete and retry + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + admin.namespaces().deleteNamespace(namespace); + }); + admin.namespaces().createNamespace(namespace, 1); + admin.namespaces().splitNamespaceBundle(namespace, firstBundle, true, null); + + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + BundlesData bundlesData = admin.namespaces().getBundles(namespace); + assertEquals(bundlesData.getNumBundles(), numBundles + 1); + String lowBundle = String.format("0x%08x", bundleRanges.get(0)); + String midBundle = String.format("0x%08x", mid); + String highBundle = String.format("0x%08x", bundleRanges.get(1)); + assertTrue(bundlesData.getBoundaries().contains(lowBundle)); + assertTrue(bundlesData.getBoundaries().contains(midBundle)); + assertTrue(bundlesData.getBoundaries().contains(highBundle)); + assertEquals(splitCount.get(), 2); + }); } @Test(timeOut = 30 * 1000) @@ -1221,7 +1251,11 @@ public void testTryAcquiringOwnership() NamespaceEphemeralData namespaceEphemeralData = primaryLoadManager.tryAcquiringOwnership(bundle).get(); assertTrue(Set.of(pulsar1.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrl()) .contains(namespaceEphemeralData.getNativeUrl())); - admin.namespaces().deleteNamespace(namespace); + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .untilAsserted(() -> { + admin.namespaces().deleteNamespace(namespace, true); + }); } @Test(timeOut = 30 * 1000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index ceb58e8d9647cf..72f8827632bd3a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -573,11 +573,11 @@ public void splitAndRetryTest() throws Exception { childBundle1Range, Optional.empty(), childBundle2Range, Optional.empty())); channel1.publishSplitEventAsync(split); - waitUntilState(channel1, bundle, Deleted); - waitUntilState(channel2, bundle, Deleted); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); - validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0); + validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); // Verify the retry count @@ -617,7 +617,7 @@ public void splitAndRetryTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; validateMonitorCounters(leader, 0, - 1, + 0, 0, 0, 0, @@ -1233,15 +1233,15 @@ public void splitTestWhenProducerFails() var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; - waitUntilStateWithMonitor(leader, bundle, Deleted); - waitUntilStateWithMonitor(channel1, bundle, Deleted); - waitUntilStateWithMonitor(channel2, bundle, Deleted); + waitUntilStateWithMonitor(leader, bundle, Init); + waitUntilStateWithMonitor(channel1, bundle, Init); + waitUntilStateWithMonitor(channel2, bundle, Init); var ownerAddr1 = channel1.getOwnerAsync(bundle); var ownerAddr2 = channel2.getOwnerAsync(bundle); - assertTrue(ownerAddr1.isCompletedExceptionally()); - assertTrue(ownerAddr2.isCompletedExceptionally()); + assertTrue(ownerAddr1.get().isEmpty()); + assertTrue(ownerAddr2.get().isEmpty()); FieldUtils.writeDeclaredField(channel1, @@ -1425,13 +1425,15 @@ public void splitAndRetryFailureTest() throws Exception { var leader = channel1.isChannelOwnerAsync().get() ? channel1 : channel2; ((ServiceUnitStateChannelImpl) leader) .monitorOwnerships(List.of(brokerId1, brokerId2)); - waitUntilState(leader, bundle3, Deleted); - waitUntilState(channel1, bundle3, Deleted); - waitUntilState(channel2, bundle3, Deleted); + + waitUntilState(leader, bundle3, Init); + waitUntilState(channel1, bundle3, Init); + waitUntilState(channel2, bundle3, Init); - validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 0, 0, 1, 0); - validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 1, 0); + + validateHandlerCounters(channel1, 1, 0, 3, 0, 0, 0, 2, 1, 0, 0, 1, 0, 1, 0); + validateHandlerCounters(channel2, 1, 0, 3, 0, 0, 0, 2, 0, 0, 0, 1, 0, 1, 0); validateEventCounters(channel1, 1, 0, 1, 0, 0, 0); validateEventCounters(channel2, 0, 0, 0, 0, 0, 0); @@ -1461,7 +1463,7 @@ public void splitAndRetryFailureTest() throws Exception { validateMonitorCounters(leader, 0, - 1, + 0, 1, 0, 0, @@ -1539,7 +1541,7 @@ public void testOverrideInactiveBrokerStateData() waitUntilNewOwner(channel2, ownedBundle, brokerId2); assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); - assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally()); + assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); // clean-up FieldUtils.writeDeclaredField(leaderChannel, "maxCleanupDelayTimeInSecs", 3 * 60, true); @@ -1602,7 +1604,7 @@ public void testOverrideOrphanStateData() waitUntilNewOwner(channel2, ownedBundle, broker); assertEquals(Optional.empty(), channel2.getOwnerAsync(freeBundle).get()); assertTrue(channel2.getOwnerAsync(deletedBundle).isCompletedExceptionally()); - assertTrue(channel2.getOwnerAsync(splittingBundle).isCompletedExceptionally()); + assertTrue(channel2.getOwnerAsync(splittingBundle).get().isEmpty()); // clean-up FieldUtils.writeDeclaredField(channel1, @@ -1660,8 +1662,10 @@ public void testActiveGetOwner() throws Exception { "inFlightStateWaitingTimeInMillis", 20 * 1000, true); start = System.currentTimeMillis(); assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty()); - assertTrue(System.currentTimeMillis() - start < 20_000); + waitUntilState(channel1, bundle, Init); + waitUntilState(channel2, bundle, Init); + assertTrue(System.currentTimeMillis() - start < 20_000); // simulate ownership cleanup(brokerId1 selected owner) by the leader channel overrideTableViews(bundle, new ServiceUnitStateData(Owned, broker, null, 1)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java index 3287306ab48bad..57b7830214b92a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/SplitManagerTest.java @@ -123,40 +123,23 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 1); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Deleted, dstBroker, VERSION_ID_INIT), null); - counterExpected.update(SplitDecision.Label.Success, Sessions); - assertEquals(inFlightUnloadRequests.size(), 0); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); + assertEquals(inFlightUnloadRequests.size(), 1); - // Success with Init state. - future = manager.waitAsync(CompletableFuture.completedFuture(null), - bundle, decision, 5, TimeUnit.SECONDS); - inFlightUnloadRequests = getinFlightUnloadRequests(manager); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 1); + + // Success with Init state. manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Init, dstBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequests.size(), 0); counterExpected.update(SplitDecision.Label.Success, Sessions); assertEquals(counter.toMetrics(null).toString(), counterExpected.toMetrics(null).toString()); - future.get(); - // Success with Owned state. - future = manager.waitAsync(CompletableFuture.completedFuture(null), - bundle, decision, 5, TimeUnit.SECONDS); - inFlightUnloadRequests = getinFlightUnloadRequests(manager); - assertEquals(inFlightUnloadRequests.size(), 1); - manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, VERSION_ID_INIT), null); - assertEquals(inFlightUnloadRequests.size(), 0); - counterExpected.update(SplitDecision.Label.Success, Sessions); - assertEquals(counter.toMetrics(null).toString(), - counterExpected.toMetrics(null).toString()); future.get(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java index 45b1cd9544f91c..b7dae62062944c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/UnloadManagerTest.java @@ -123,11 +123,15 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int assertEquals(inFlightUnloadRequestMap.size(), 1); manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, true, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); + // Success with Init state. manager.handleEvent(bundle, - new ServiceUnitStateData(ServiceUnitState.Free, null, srcBroker, VERSION_ID_INIT), null); + new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, false, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Init, null, srcBroker, true, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 1); @@ -137,17 +141,30 @@ public void testSuccess() throws IllegalAccessException, ExecutionException, Int bundle, unloadDecision, 5, TimeUnit.SECONDS); inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, null, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 1); - manager.handleEvent(bundle, new ServiceUnitStateData(ServiceUnitState.Owned, dstBroker, srcBroker, VERSION_ID_INIT), null); assertEquals(inFlightUnloadRequestMap.size(), 0); - future.get(); assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 2); + + // Success with Free state. + future = manager.waitAsync(CompletableFuture.completedFuture(null), + bundle, unloadDecision, 5, TimeUnit.SECONDS); + inFlightUnloadRequestMap = getInFlightUnloadRequestMap(manager); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, true, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 1); + manager.handleEvent(bundle, + new ServiceUnitStateData(ServiceUnitState.Free, dstBroker, srcBroker, false, VERSION_ID_INIT), null); + assertEquals(inFlightUnloadRequestMap.size(), 0); + future.get(); + assertEquals(counter.getBreakdownCounters().get(Success).get(Admin).get(), 3); + + } @Test