From d41117c983f82756b0ec06d0e53a0d30f414930c Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Sun, 24 Dec 2023 14:37:39 -0800 Subject: [PATCH] [improve][broker] check if the owner is active when returning ownership (ExtensibleLoadManager) --- .../ExtensibleLoadManagerWrapper.java | 2 - .../channel/ServiceUnitStateChannelImpl.java | 35 +++++++--- .../channel/ServiceUnitStateChannelTest.java | 68 +++++++++++++++++++ 3 files changed, 95 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 18e949537dedb..cd1561cb70e2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() { @Override public void writeLoadReportOnZookeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. - throw new UnsupportedOperationException(); } @Override public void writeResourceQuotasToZooKeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. - throw new UnsupportedOperationException(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 713d98b72507e..50fa412ebc979 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -487,6 +487,30 @@ public boolean isOwner(String serviceUnit) { return isOwner(serviceUnit, lookupServiceAddress); } + private CompletableFuture> getActiveOwnerAsync( + String serviceUnit, + ServiceUnitState state, + Optional owner) { + CompletableFuture> activeOwner = owner.isPresent() + ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner)) + : CompletableFuture.completedFuture(Optional.empty()); + + + return activeOwner + .thenCompose(broker -> broker + .map(__ -> activeOwner) + .orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply( + ownerAfterDeferred -> ownerAfterDeferred == null ? Optional.empty() + : Optional.of(ownerAfterDeferred)))) + .whenComplete((__, e) -> { + if (e != null) { + log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}", + serviceUnit, state, owner, e); + ownerLookUpCounters.get(state).getFailure().incrementAndGet(); + } + }); + } + public CompletableFuture> getOwnerAsync(String serviceUnit) { if (!validateChannelState(Started, true)) { return CompletableFuture.failedFuture( @@ -498,18 +522,13 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { ownerLookUpCounters.get(state).getTotal().incrementAndGet(); switch (state) { case Owned -> { - return CompletableFuture.completedFuture(Optional.of(data.dstBroker())); + return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker())); } case Splitting -> { - return CompletableFuture.completedFuture(Optional.of(data.sourceBroker())); + return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker())); } case Assigning, Releasing -> { - return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> { - if (e != null) { - ownerLookUpCounters.get(state).getFailure().incrementAndGet(); - } - }).thenApply( - broker -> broker == null ? Optional.empty() : Optional.of(broker)); + return getActiveOwnerAsync(serviceUnit, state, Optional.empty()); } case Init, Free -> { return CompletableFuture.completedFuture(Optional.empty()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index f99a167ff4883..fb2dc239561ed 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -39,6 +39,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertThrows; +import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -1560,6 +1561,73 @@ public void testOverrideOrphanStateData() cleanTableViews(); } + @Test(priority = 19) + public void testActiveGetOwner() + throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { + + + // set the bundle owner is the broker + String broker = lookupServiceAddress2; + String bundle = "public/owned/0xfffffff0_0xffffffff"; + overrideTableViews(bundle, + new ServiceUnitStateData(Owned, broker, null, 1)); + var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get(); + assertEquals(owner, broker); + + // simulate the owner is inactive + var spyRegistry = spy(new BrokerRegistryImpl(pulsar)); + doReturn(CompletableFuture.completedFuture(Optional.empty())) + .when(spyRegistry).lookupAsync(eq(broker)); + FieldUtils.writeDeclaredField(channel1, + "brokerRegistry", spyRegistry , true); + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 1000, true); + + + // verify getOwnerAsync times out because the owner is inactive now. + long start = System.currentTimeMillis(); + try { + channel1.getOwnerAsync(bundle).get(); + fail(); + } catch (Exception e) { + if (e.getCause() instanceof TimeoutException) { + // expected + } else { + fail(); + } + } + assertTrue(System.currentTimeMillis() - start >= 1000); + + // simulate ownership cleanup by the leader channel + doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))) + .when(loadManager).selectAsync(any(), any()); + var leaderChannel = channel1; + String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); + String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); + assertEquals(leader, leader2); + if (leader.equals(lookupServiceAddress2)) { + leaderChannel = channel2; + } + leaderChannel.handleMetadataSessionEvent(SessionReestablished); + FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", + System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + + // verify the ownership cleanup, and channel's getOwnerAsync returns without timeout + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 10 * 1000, true); + start = System.currentTimeMillis(); + assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get()); + assertTrue(System.currentTimeMillis() - start < 10_000); + + // test clean-up + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + FieldUtils.writeDeclaredField(channel1, + "brokerRegistry", registry , true); + cleanTableViews(); + + } private static ConcurrentHashMap>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException {