diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index da4cee7b4651c..e397dbb64a075 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -388,7 +388,11 @@ public void healthCheck(@Suspended AsyncResponse asyncResponse, asyncResponse.resume(Response.ok("ok").build()); }).exceptionally(ex -> { if (!isRedirectException(ex)) { - LOG.error("[{}] Fail to run health check.", clientAppId(), ex); + if (isNotFoundException(ex)) { + LOG.warn("[{}] Failed to run health check: {}", clientAppId(), ex.getMessage()); + } else { + LOG.error("[{}] Failed to run health check.", clientAppId(), ex); + } } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 49d038d512e59..1c552e208eab0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -87,6 +87,7 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -1291,6 +1292,14 @@ private void handleBrokerCreationEvent(String broker) { broker, cleanupJobs.size()); } } + }) + .exceptionally(e -> { + if (FutureUtil.unwrapCompletionException(e) instanceof PulsarAdminException.NotFoundException) { + log.warn("{} Failed to run health check: {}", broker, e.getMessage()); + } else { + log.error("{} Failed to run health check", broker, e); + } + return null; }); } } @@ -1323,12 +1332,19 @@ private void handleBrokerDeletionEvent(String broker) { } } + private boolean channelDisabled() { + final var channelState = this.channelState; + if (channelState == Disabled || channelState == Closed) { + log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState); + return true; + } + return false; + } + private void scheduleCleanup(String broker, long delayInSecs) { var scheduled = new MutableObject>(); try { - final var channelState = this.channelState; - if (channelState == Disabled || channelState == Closed) { - log.warn("[{}] Skip scheduleCleanup because the state is {} now", brokerId, channelState); + if (channelDisabled()) { return; } cleanupJobs.computeIfAbsent(broker, k -> { @@ -1462,6 +1478,10 @@ private CompletableFuture healthCheckBrokerAsync(String brokerId) { } private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture future) { + if (channelDisabled()) { + future.complete(null); + return; + } try { var admin = getPulsarAdmin(); admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId)) @@ -1472,7 +1492,6 @@ private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, Com return; } if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) { - log.error("Failed health-check broker :{}", brokerId, e); future.completeExceptionally(FutureUtil.unwrapCompletionException(e)); } else { pulsar.getExecutor() @@ -1501,6 +1520,9 @@ private synchronized void doCleanup(String broker, boolean gracefully) { // if not gracefully, verify the broker is inactive by health-check. if (!gracefully) { + if (channelDisabled()) { + return; + } try { healthCheckBrokerAsync(broker).get( pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS); @@ -1508,6 +1530,10 @@ private synchronized void doCleanup(String broker, boolean gracefully) { broker); return; } catch (Exception e) { + if (e instanceof ExecutionException && e.getCause() instanceof PulsarAdminException.NotFoundException) { + log.info("The broker is not healthy, skip {}'s orphan bundle cleanup", broker); + return; + } if (debug()) { log.info("Failed to check broker:{} health", broker, e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java index ca44f6bc4d6d9..c8427d1a66d53 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerCloseTest.java @@ -38,7 +38,7 @@ import org.testng.annotations.Test; @Slf4j -@Test(groups = "flaky") +@Test(groups = "broker") public class ExtensibleLoadManagerCloseTest { private static final String clusterName = "test"; @@ -88,14 +88,18 @@ private ServiceConfiguration brokerConfig() { config.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName()); config.setLoadBalancerDebugModeEnabled(true); config.setBrokerShutdownTimeoutMs(100); + + // Reduce these timeout configs to avoid failed tests being blocked too long + config.setMetadataStoreOperationTimeoutSeconds(5); + config.setNamespaceBundleUnloadingTimeoutMs(5000); return config; } - @Test + @Test(invocationCount = 10) public void testCloseAfterLoadingBundles() throws Exception { setupBrokers(3); - final var topic = "test"; + final var topic = "test-" + System.currentTimeMillis(); final var admin = brokers.get(0).getAdminClient(); admin.topics().createPartitionedTopic(topic, 20); admin.lookups().lookupPartitionedTopic(topic);