diff --git a/conf/broker.conf b/conf/broker.conf index 617e202e5ec65..e745fcb2b0a8f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1689,6 +1689,8 @@ exposePublisherStats=true statsUpdateFrequencyInSecs=60 statsUpdateInitialDelayInSecs=60 +healthCheckMetricsUpdateTimeInSeconds=-1 + # Enable expose the precise backlog stats. # Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. # Default is false. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 58d6444e7196a..81073b1731b24 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -3280,6 +3280,12 @@ public double getLoadBalancerBandwidthOutResourceWeight() { doc = "Stats update initial delay in seconds" ) private int statsUpdateInitialDelayInSecs = 60; + @FieldContext( + category = CATEGORY_METRICS, + minValue = -1, + doc = "HealthCheck update frequency in seconds. Disable health check with value -1 (Default value -1)" + ) + private int healthCheckMetricsUpdateTimeInSeconds = -1; @FieldContext( category = CATEGORY_METRICS, doc = "If true, aggregate publisher stats of PartitionedTopicStats by producerName" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index e13cb1858f79d..da4cee7b4651c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -51,6 +51,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.PulsarService.State; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; @@ -422,26 +423,35 @@ public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration } private CompletableFuture internalRunHealthCheck(TopicVersion topicVersion) { - String brokerId = pulsar().getBrokerId(); + return internalRunHealthCheck(topicVersion, pulsar(), clientAppId()); + } + + + public static CompletableFuture internalRunHealthCheck(TopicVersion topicVersion, PulsarService pulsar, + String clientAppId) { + NamespaceName namespaceName = (topicVersion == TopicVersion.V2) + ? NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()) + : NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration()); + String brokerId = pulsar.getBrokerId(); final String topicName = - getHeartbeatTopicName(brokerId, pulsar().getConfiguration(), (topicVersion == TopicVersion.V2)); - LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName); + getHeartbeatTopicName(brokerId, pulsar.getConfiguration(), (topicVersion == TopicVersion.V2)); + LOG.info("[{}] Running healthCheck with topic={}", clientAppId, topicName); final String messageStr = UUID.randomUUID().toString(); final String subscriptionName = "healthCheck-" + messageStr; // create non-partitioned topic manually and close the previous reader if present. - return pulsar().getBrokerService().getTopic(topicName, true) + return pulsar.getBrokerService().getTopic(topicName, true) .thenCompose(topicOptional -> { if (!topicOptional.isPresent()) { LOG.error("[{}] Fail to run health check while get topic {}. because get null value.", - clientAppId(), topicName); + clientAppId, topicName); throw new RestException(Status.NOT_FOUND, String.format("Topic [%s] not found after create.", topicName)); } PulsarClient client; try { - client = pulsar().getClient(); + client = pulsar.getClient(); } catch (PulsarServerException e) { - LOG.error("[{}] Fail to run health check while get client.", clientAppId()); + LOG.error("[{}] Fail to run health check while get client.", clientAppId); throw new RestException(e); } CompletableFuture resultFuture = new CompletableFuture<>(); @@ -451,17 +461,18 @@ private CompletableFuture internalRunHealthCheck(TopicVersion topicVersion .startMessageId(MessageId.latest) .createAsync().exceptionally(createException -> { producer.closeAsync().exceptionally(ex -> { - LOG.error("[{}] Close producer fail while heath check.", clientAppId()); + LOG.error("[{}] Close producer fail while heath check.", clientAppId); return null; }); throw FutureUtil.wrapToCompletionException(createException); }).thenCompose(reader -> producer.sendAsync(messageStr) .thenCompose(__ -> FutureUtil.addTimeoutHandling( healthCheckRecursiveReadNext(reader, messageStr), - HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(), + HEALTH_CHECK_READ_TIMEOUT, pulsar.getBrokerService().executor(), () -> HEALTH_CHECK_TIMEOUT_EXCEPTION)) .whenComplete((__, ex) -> { - closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName) + closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName, + clientAppId) .whenComplete((unused, innerEx) -> { if (ex != null) { resultFuture.completeExceptionally(ex); @@ -479,6 +490,11 @@ HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(), }); } + private CompletableFuture closeAndReCheck(Producer producer, Reader reader, + Topic topic, String subscriptionName) { + return closeAndReCheck(producer, reader, topic, subscriptionName, clientAppId()); + } + /** * Close producer and reader and then to re-check if this operation is success. * @@ -491,8 +507,8 @@ HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(), * @param topic Topic * @param subscriptionName Subscription name */ - private CompletableFuture closeAndReCheck(Producer producer, Reader reader, - Topic topic, String subscriptionName) { + private static CompletableFuture closeAndReCheck(Producer producer, Reader reader, + Topic topic, String subscriptionName, String clientAppId) { // no matter exception or success, we still need to // close producer/reader CompletableFuture producerFuture = producer.closeAsync(); @@ -503,7 +519,7 @@ private CompletableFuture closeAndReCheck(Producer producer, Reade return FutureUtil.waitForAll(Collections.unmodifiableList(futures)) .exceptionally(closeException -> { if (readerFuture.isCompletedExceptionally()) { - LOG.error("[{}] Close reader fail while heath check.", clientAppId()); + LOG.error("[{}] Close reader fail while heath check.", clientAppId); Subscription subscription = topic.getSubscription(subscriptionName); // re-check subscription after reader close @@ -511,24 +527,24 @@ private CompletableFuture closeAndReCheck(Producer producer, Reade LOG.warn("[{}] Force delete subscription {} " + "when it still exists after the" + " reader is closed.", - clientAppId(), subscription); + clientAppId, subscription); subscription.deleteForcefully() .exceptionally(ex -> { LOG.error("[{}] Force delete subscription fail" + " while health check", - clientAppId(), ex); + clientAppId, ex); return null; }); } } else { // producer future fail. - LOG.error("[{}] Close producer fail while heath check.", clientAppId()); + LOG.error("[{}] Close producer fail while heath check.", clientAppId); } return null; }); } - private CompletableFuture healthCheckRecursiveReadNext(Reader reader, String content) { + private static CompletableFuture healthCheckRecursiveReadNext(Reader reader, String content) { return reader.readNextAsync() .thenCompose(msg -> { if (!Objects.equals(content, msg.getValue())) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index dd722dffcfbfc..c240c758dcda6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -24,6 +24,7 @@ import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.admin.impl.BrokersBase.internalRunHealthCheck; import static org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX; import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX; import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName; @@ -157,6 +158,7 @@ import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.naming.TopicVersion; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; @@ -241,6 +243,7 @@ public class BrokerService implements Closeable { private AuthorizationService authorizationService; private final ScheduledExecutorService statsUpdater; + @Getter private final ScheduledExecutorService backlogQuotaChecker; @@ -346,6 +349,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws this.acceptorGroup = EventLoopUtil.newEventLoopGroup( pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory); this.workerGroup = eventLoopGroup; + this.statsUpdater = OrderedScheduler.newSchedulerBuilder() .name("pulsar-stats-updater") .numThreads(1) @@ -611,6 +615,7 @@ public void start() throws Exception { this.startStatsUpdater( serviceConfig.getStatsUpdateInitialDelayInSecs(), serviceConfig.getStatsUpdateFrequencyInSecs()); + this.initializeHealthChecker(); this.startInactivityMonitor(); this.startMessageExpiryMonitor(); this.startCompactionMonitor(); @@ -640,6 +645,24 @@ protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpd updateRates(); } + protected void initializeHealthChecker() { + ServiceConfiguration config = pulsar().getConfiguration(); + if (config.getHealthCheckMetricsUpdateTimeInSeconds() > 0) { + int interval = config.getHealthCheckMetricsUpdateTimeInSeconds(); + statsUpdater.scheduleAtFixedRate(this::checkHealth, + interval, interval, TimeUnit.SECONDS); + } + } + + public CompletableFuture checkHealth() { + return internalRunHealthCheck(TopicVersion.V2, pulsar(), null).thenAccept(__ -> { + this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusSuccess(); + }).exceptionally(ex -> { + this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusFail(); + return null; + }); + } + protected void startDeduplicationSnapshotMonitor() { // We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this // scheduled task runs. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 3f991be8184ab..1855e1798b465 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -42,6 +42,7 @@ public class BrokerOperabilityMetrics implements AutoCloseable { private final LongAdder connectionTotalCreatedCount; private final LongAdder connectionTotalClosedCount; private final LongAdder connectionActive; + private volatile int healthCheckStatus; // 1=success, 0=failure, -1=unknown private final LongAdder connectionCreateSuccessCount; private final LongAdder connectionCreateFailCount; @@ -61,7 +62,7 @@ public BrokerOperabilityMetrics(PulsarService pulsar) { this.connectionTotalCreatedCount = new LongAdder(); this.connectionTotalClosedCount = new LongAdder(); this.connectionActive = new LongAdder(); - + this.healthCheckStatus = -1; this.connectionCreateSuccessCount = new LongAdder(); this.connectionCreateFailCount = new LongAdder(); @@ -103,6 +104,7 @@ private void generate() { reset(); metricsList.add(getTopicLoadMetrics()); metricsList.add(getConnectionMetrics()); + metricsList.add(getHealthMetrics()); } public Metrics generateConnectionMetrics() { @@ -119,6 +121,12 @@ Metrics getConnectionMetrics() { return rMetrics; } + Metrics getHealthMetrics() { + Metrics rMetrics = Metrics.create(getDimensionMap("broker_health")); + rMetrics.put("brk_health", healthCheckStatus); + return rMetrics; + } + Map getDimensionMap(String metricsName) { Map dimensionMap = new HashMap<>(); dimensionMap.put("broker", brokerName); @@ -179,4 +187,12 @@ public void recordConnectionCreateSuccess() { public void recordConnectionCreateFail() { this.connectionCreateFailCount.increment(); } + + public void recordHealthCheckStatusSuccess() { + this.healthCheckStatus = 1; + } + + public void recordHealthCheckStatusFail() { + this.healthCheckStatus = 0; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 640cd2d37e399..36e741f8fa9cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1607,6 +1607,38 @@ public void testBrokerConnectionStats() throws Exception { assertEquals((long) map.get("brk_connection_create_fail_count"), 1); } + /** + * There is detailed info about this test. + * see: https://github.com/apache/pulsar/issues/10150#issuecomment-1112380074 + */ + @Test + public void testBrokerHealthCheckStatus() throws Exception { + + cleanup(); + conf.setSystemTopicEnabled(false); + conf.setTopicLevelPoliciesEnabled(false); + conf.setHealthCheckMetricsUpdateTimeInSeconds(60); + setup(); + BrokerService brokerService = this.pulsar.getBrokerService(); + + Map map = null; + + brokerService.checkHealth().get(); + brokerService.updateRates(); + Awaitility.await().until(() -> this.activeCount.get() == 1); + List metrics = brokerService.getTopicMetrics(); + System.out.println(metrics); + + for (int i = 0; i < metrics.size(); i++) { + if (metrics.get(i).getDimensions().containsValue("broker_health")) { + map = metrics.get(i).getMetrics(); + break; + } + } + assertNotNull(map); + assertEquals(map.get("brk_health"), 1); + } + @Test public void testPayloadCorruptionDetection() throws Exception { final String topicName = "persistent://prop/ns-abc/topic1"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java index a92f5a4acc208..fa073d3694b26 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java @@ -79,6 +79,7 @@ import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.service.AbstractTopic; +import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; @@ -1789,6 +1790,20 @@ public void testBrokerConnection() throws Exception { compareBrokerConnectionStateCount(cm, 2.0); } + @Test + public void testBrokerHealthCheckMetric() throws Exception { + conf.setHealthCheckMetricsUpdateTimeInSeconds(60); + BrokerService brokerService = pulsar.getBrokerService(); + brokerService.checkHealth().get(); + brokerService.updateRates(); + ByteArrayOutputStream statsOut = new ByteArrayOutputStream(); + PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut); + String metricsStr = statsOut.toString(); + Multimap metrics = parseMetrics(metricsStr); + List cm = (List) metrics.get("pulsar_health"); + compareBrokerConnectionStateCount(cm, 1); + } + private void compareBrokerConnectionStateCount(List cm, double count) { assertEquals(cm.size(), 1); assertEquals(cm.get(0).tags.get("cluster"), "test"); @@ -1894,7 +1909,6 @@ public void testMetricsWithCache() throws Throwable { PrometheusMetricsGenerator prometheusMetricsGenerator = new PrometheusMetricsGenerator(pulsar, true, false, false, false, clock); - String previousMetrics = null; for (int a = 0; a < 4; a++) { ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream(); @@ -1908,7 +1922,6 @@ public void testMetricsWithCache() throws Throwable { assertEquals(metricsStr1, metricsStr2); assertNotEquals(metricsStr1, previousMetrics); previousMetrics = metricsStr1; - // move time forward currentTimeMillis.addAndGet(TimeUnit.SECONDS.toMillis(2)); }