Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] [broker] Add broker health check status into prometheus metrics #20147

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1689,6 +1689,8 @@ exposePublisherStats=true
statsUpdateFrequencyInSecs=60
statsUpdateInitialDelayInSecs=60

healthCheckMetricsUpdateTimeInSeconds=-1

# Enable expose the precise backlog stats.
# Set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate.
# Default is false.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3280,6 +3280,12 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
doc = "Stats update initial delay in seconds"
)
private int statsUpdateInitialDelayInSecs = 60;
@FieldContext(
category = CATEGORY_METRICS,
minValue = -1,
doc = "HealthCheck update frequency in seconds. Disable health check with value -1 (Default value -1)"
)
private int healthCheckMetricsUpdateTimeInSeconds = -1;
@FieldContext(
category = CATEGORY_METRICS,
doc = "If true, aggregate publisher stats of PartitionedTopicStats by producerName"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.PulsarService.State;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
Expand Down Expand Up @@ -422,26 +423,35 @@ public static String getHeartbeatTopicName(String brokerId, ServiceConfiguration
}

private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
String brokerId = pulsar().getBrokerId();
return internalRunHealthCheck(topicVersion, pulsar(), clientAppId());
}


public static CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion, PulsarService pulsar,
String clientAppId) {
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
: NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration());
String brokerId = pulsar.getBrokerId();
final String topicName =
getHeartbeatTopicName(brokerId, pulsar().getConfiguration(), (topicVersion == TopicVersion.V2));
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
getHeartbeatTopicName(brokerId, pulsar.getConfiguration(), (topicVersion == TopicVersion.V2));
LOG.info("[{}] Running healthCheck with topic={}", clientAppId, topicName);
final String messageStr = UUID.randomUUID().toString();
final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader if present.
return pulsar().getBrokerService().getTopic(topicName, true)
return pulsar.getBrokerService().getTopic(topicName, true)
.thenCompose(topicOptional -> {
if (!topicOptional.isPresent()) {
LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
clientAppId(), topicName);
clientAppId, topicName);
throw new RestException(Status.NOT_FOUND,
String.format("Topic [%s] not found after create.", topicName));
}
PulsarClient client;
try {
client = pulsar().getClient();
client = pulsar.getClient();
} catch (PulsarServerException e) {
LOG.error("[{}] Fail to run health check while get client.", clientAppId());
LOG.error("[{}] Fail to run health check while get client.", clientAppId);
throw new RestException(e);
}
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
Expand All @@ -451,17 +461,18 @@ private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion
.startMessageId(MessageId.latest)
.createAsync().exceptionally(createException -> {
producer.closeAsync().exceptionally(ex -> {
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
LOG.error("[{}] Close producer fail while heath check.", clientAppId);
return null;
});
throw FutureUtil.wrapToCompletionException(createException);
}).thenCompose(reader -> producer.sendAsync(messageStr)
.thenCompose(__ -> FutureUtil.addTimeoutHandling(
healthCheckRecursiveReadNext(reader, messageStr),
HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
HEALTH_CHECK_READ_TIMEOUT, pulsar.getBrokerService().executor(),
() -> HEALTH_CHECK_TIMEOUT_EXCEPTION))
.whenComplete((__, ex) -> {
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName,
clientAppId)
.whenComplete((unused, innerEx) -> {
if (ex != null) {
resultFuture.completeExceptionally(ex);
Expand All @@ -479,6 +490,11 @@ HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
});
}

private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName) {
return closeAndReCheck(producer, reader, topic, subscriptionName, clientAppId());
}

/**
* Close producer and reader and then to re-check if this operation is success.
*
Expand All @@ -491,8 +507,8 @@ HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
* @param topic Topic
* @param subscriptionName Subscription name
*/
private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName) {
private static CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
Topic topic, String subscriptionName, String clientAppId) {
// no matter exception or success, we still need to
// close producer/reader
CompletableFuture<Void> producerFuture = producer.closeAsync();
Expand All @@ -503,32 +519,32 @@ private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reade
return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
.exceptionally(closeException -> {
if (readerFuture.isCompletedExceptionally()) {
LOG.error("[{}] Close reader fail while heath check.", clientAppId());
LOG.error("[{}] Close reader fail while heath check.", clientAppId);
Subscription subscription =
topic.getSubscription(subscriptionName);
// re-check subscription after reader close
if (subscription != null) {
LOG.warn("[{}] Force delete subscription {} "
+ "when it still exists after the"
+ " reader is closed.",
clientAppId(), subscription);
clientAppId, subscription);
subscription.deleteForcefully()
.exceptionally(ex -> {
LOG.error("[{}] Force delete subscription fail"
+ " while health check",
clientAppId(), ex);
clientAppId, ex);
return null;
});
}
} else {
// producer future fail.
LOG.error("[{}] Close producer fail while heath check.", clientAppId());
LOG.error("[{}] Close producer fail while heath check.", clientAppId);
}
return null;
});
}

private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
private static CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
return reader.readNextAsync()
.thenCompose(msg -> {
if (!Objects.equals(content, msg.getValue())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY;
import static org.apache.commons.collections4.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.impl.BrokersBase.internalRunHealthCheck;
import static org.apache.pulsar.client.util.RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
import static org.apache.pulsar.client.util.RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
import static org.apache.pulsar.common.naming.SystemTopicNames.isTransactionInternalName;
Expand Down Expand Up @@ -157,6 +158,7 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
Expand Down Expand Up @@ -241,6 +243,7 @@ public class BrokerService implements Closeable {

private AuthorizationService authorizationService;
private final ScheduledExecutorService statsUpdater;

@Getter
private final ScheduledExecutorService backlogQuotaChecker;

Expand Down Expand Up @@ -346,6 +349,7 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(
pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory);
this.workerGroup = eventLoopGroup;

this.statsUpdater = OrderedScheduler.newSchedulerBuilder()
.name("pulsar-stats-updater")
.numThreads(1)
Expand Down Expand Up @@ -611,6 +615,7 @@ public void start() throws Exception {
this.startStatsUpdater(
serviceConfig.getStatsUpdateInitialDelayInSecs(),
serviceConfig.getStatsUpdateFrequencyInSecs());
this.initializeHealthChecker();
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
Expand Down Expand Up @@ -640,6 +645,24 @@ protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpd
updateRates();
}

protected void initializeHealthChecker() {
ServiceConfiguration config = pulsar().getConfiguration();
if (config.getHealthCheckMetricsUpdateTimeInSeconds() > 0) {
int interval = config.getHealthCheckMetricsUpdateTimeInSeconds();
statsUpdater.scheduleAtFixedRate(this::checkHealth,
interval, interval, TimeUnit.SECONDS);
}
}

public CompletableFuture<Void> checkHealth() {
return internalRunHealthCheck(TopicVersion.V2, pulsar(), null).thenAccept(__ -> {
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusSuccess();
}).exceptionally(ex -> {
this.pulsarStats.getBrokerOperabilityMetrics().recordHealthCheckStatusFail();
return null;
});
}

protected void startDeduplicationSnapshotMonitor() {
// We do not know whether users will enable deduplication on namespace level/topic level or not, so keep this
// scheduled task runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class BrokerOperabilityMetrics implements AutoCloseable {
private final LongAdder connectionTotalCreatedCount;
private final LongAdder connectionTotalClosedCount;
private final LongAdder connectionActive;
private volatile int healthCheckStatus; // 1=success, 0=failure, -1=unknown

private final LongAdder connectionCreateSuccessCount;
private final LongAdder connectionCreateFailCount;
Expand All @@ -61,7 +62,7 @@ public BrokerOperabilityMetrics(PulsarService pulsar) {
this.connectionTotalCreatedCount = new LongAdder();
this.connectionTotalClosedCount = new LongAdder();
this.connectionActive = new LongAdder();

this.healthCheckStatus = -1;
this.connectionCreateSuccessCount = new LongAdder();
this.connectionCreateFailCount = new LongAdder();

Expand Down Expand Up @@ -103,6 +104,7 @@ private void generate() {
reset();
metricsList.add(getTopicLoadMetrics());
metricsList.add(getConnectionMetrics());
metricsList.add(getHealthMetrics());
}

public Metrics generateConnectionMetrics() {
Expand All @@ -119,6 +121,12 @@ Metrics getConnectionMetrics() {
return rMetrics;
}

Metrics getHealthMetrics() {
Metrics rMetrics = Metrics.create(getDimensionMap("broker_health"));
rMetrics.put("brk_health", healthCheckStatus);
return rMetrics;
}

Map<String, String> getDimensionMap(String metricsName) {
Map<String, String> dimensionMap = new HashMap<>();
dimensionMap.put("broker", brokerName);
Expand Down Expand Up @@ -179,4 +187,12 @@ public void recordConnectionCreateSuccess() {
public void recordConnectionCreateFail() {
this.connectionCreateFailCount.increment();
}

public void recordHealthCheckStatusSuccess() {
this.healthCheckStatus = 1;
}

public void recordHealthCheckStatusFail() {
this.healthCheckStatus = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1607,6 +1607,38 @@ public void testBrokerConnectionStats() throws Exception {
assertEquals((long) map.get("brk_connection_create_fail_count"), 1);
}

/**
* There is detailed info about this test.
* see: https://github.com/apache/pulsar/issues/10150#issuecomment-1112380074
*/
@Test
public void testBrokerHealthCheckStatus() throws Exception {

cleanup();
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
conf.setHealthCheckMetricsUpdateTimeInSeconds(60);
setup();
BrokerService brokerService = this.pulsar.getBrokerService();

Map<String, Object> map = null;

brokerService.checkHealth().get();
brokerService.updateRates();
Awaitility.await().until(() -> this.activeCount.get() == 1);
List<Metrics> metrics = brokerService.getTopicMetrics();
System.out.println(metrics);

for (int i = 0; i < metrics.size(); i++) {
if (metrics.get(i).getDimensions().containsValue("broker_health")) {
map = metrics.get(i).getMetrics();
break;
}
}
assertNotNull(map);
assertEquals(map.get("brk_health"), 1);
}

@Test
public void testPayloadCorruptionDetection() throws Exception {
final String topicName = "persistent://prop/ns-abc/topic1";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.pulsar.broker.loadbalance.extensions.manager.UnloadManager;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
Expand Down Expand Up @@ -1789,6 +1790,20 @@ public void testBrokerConnection() throws Exception {
compareBrokerConnectionStateCount(cm, 2.0);
}

@Test
public void testBrokerHealthCheckMetric() throws Exception {
conf.setHealthCheckMetricsUpdateTimeInSeconds(60);
BrokerService brokerService = pulsar.getBrokerService();
brokerService.checkHealth().get();
brokerService.updateRates();
ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
PrometheusMetricsTestUtil.generate(pulsar, true, false, false, statsOut);
String metricsStr = statsOut.toString();
Multimap<String, Metric> metrics = parseMetrics(metricsStr);
List<Metric> cm = (List<Metric>) metrics.get("pulsar_health");
compareBrokerConnectionStateCount(cm, 1);
}

private void compareBrokerConnectionStateCount(List<Metric> cm, double count) {
assertEquals(cm.size(), 1);
assertEquals(cm.get(0).tags.get("cluster"), "test");
Expand Down Expand Up @@ -1894,7 +1909,6 @@ public void testMetricsWithCache() throws Throwable {
PrometheusMetricsGenerator prometheusMetricsGenerator =
new PrometheusMetricsGenerator(pulsar, true, false, false,
false, clock);

String previousMetrics = null;
for (int a = 0; a < 4; a++) {
ByteArrayOutputStream statsOut1 = new ByteArrayOutputStream();
Expand All @@ -1908,7 +1922,6 @@ public void testMetricsWithCache() throws Throwable {
assertEquals(metricsStr1, metricsStr2);
assertNotEquals(metricsStr1, previousMetrics);
previousMetrics = metricsStr1;

// move time forward
currentTimeMillis.addAndGet(TimeUnit.SECONDS.toMillis(2));
}
Expand Down
Loading