Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] [fix] [broker] Do not record a bundle-unloading into the topic load failed metrics #23334

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,11 @@ public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<V
private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) {
CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>();
topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
if (t instanceof BrokerServiceException.BundleUnloadingException) {
pulsarStats.recordConcurrencyLoadTopicAndUnloadBundle();
} else {
pulsarStats.recordTopicLoadFailed();
}
pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture));
return null;
});
Expand Down Expand Up @@ -1553,7 +1557,11 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
() -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION);

topicFuture.exceptionally(t -> {
pulsarStats.recordTopicLoadFailed();
if (t instanceof BrokerServiceException.BundleUnloadingException) {
pulsarStats.recordConcurrencyLoadTopicAndUnloadBundle();
} else {
pulsarStats.recordTopicLoadFailed();
}
return null;
});

Expand Down Expand Up @@ -2226,7 +2234,7 @@ public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
+ "Please redo the lookup. Request is denied: namespace=%s",
topic, pulsar.getBrokerId(), topicName.getNamespace());
log.warn(msg);
return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg));
return FutureUtil.failedFuture(new BrokerServiceException.BundleUnloadingException(msg));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public ServiceUnitNotReadyException(String msg, Throwable t) {
}
}

public static class BundleUnloadingException extends ServiceUnitNotReadyException {
public BundleUnloadingException(String msg) {
super(msg);
}

public BundleUnloadingException(String msg, Throwable t) {
super(msg, t);
}
}

public static class TopicClosedException extends BrokerServiceException {
public TopicClosedException(Throwable t) {
super(t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,10 @@ public void recordTopicLoadFailed() {
brokerOperabilityMetrics.recordTopicLoadFailed();
}

public void recordConcurrencyLoadTopicAndUnloadBundle() {
brokerOperabilityMetrics.recordConcurrencyLoadTopicAndUnloadBundle();
}

public void recordConnectionCreate() {
brokerOperabilityMetrics.recordConnectionCreate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
*/
public class BrokerOperabilityMetrics implements AutoCloseable {
private static final Counter TOPIC_LOAD_FAILED = Counter.build("topic_load_failed", "-").register();
private static final Counter CONCURRENCY_LOAD_TOPIC_AND_UNLOAD_BUNDLE =
Counter.build("concurrency_load_topic_and_unload_bundle", "-").register();
private final List<Metrics> metricsList;
private final String localCluster;
private final DimensionStats topicLoadStats;
Expand Down Expand Up @@ -130,6 +132,8 @@ Map<String, String> getDimensionMap(String metricsName) {
Metrics getTopicLoadMetrics() {
Metrics metrics = getDimensionMetrics("pulsar_topic_load_times", "topic_load", topicLoadStats);
metrics.put("brk_topic_load_failed_count", TOPIC_LOAD_FAILED.get());
metrics.put("brk_concurrency_load_topic_and_unload_bundle_count",
CONCURRENCY_LOAD_TOPIC_AND_UNLOAD_BUNDLE.get());
return metrics;
}

Expand Down Expand Up @@ -162,6 +166,10 @@ public void recordTopicLoadFailed() {
this.TOPIC_LOAD_FAILED.inc();
}

public void recordConcurrencyLoadTopicAndUnloadBundle() {
this.CONCURRENCY_LOAD_TOPIC_AND_UNLOAD_BUNDLE.inc();
}

public void recordConnectionCreate() {
this.connectionTotalCreatedCount.increment();
this.connectionActive.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -66,6 +67,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -1674,6 +1676,18 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {
admin.topics().createNonPartitionedTopic(topic);
admin.topics().unload(topic);

// Get original counter.
final AtomicLong failedLoadTopic1 = new AtomicLong(0);
final AtomicLong concurrencyLoadTopicAndUnloadBundle1 = new AtomicLong(0);
JerseyClient httpClient = JerseyClientBuilder.createClient();
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic = parseLongMetric(response, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle =
parseLongMetric(response, "pulsar_concurrency_load_topic_and_unload_bundle_count");
failedLoadTopic1.set(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.set(concurrencyLoadTopicAndUnloadBundle);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
Expand All @@ -1686,19 +1700,14 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {

// Do test
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().until(() -> {
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
Awaitility.await().untilAsserted(() -> {
String response2 = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
Multimap<String, PrometheusMetricsClient.Metric> metricMap = PrometheusMetricsClient.parseMetrics(response);
if (!metricMap.containsKey("pulsar_topic_load_failed_count")) {
return false;
}
double topic_load_failed_count = 0;
for (PrometheusMetricsClient.Metric metric : metricMap.get("pulsar_topic_load_failed_count")) {
topic_load_failed_count += metric.value;
}
return topic_load_failed_count >= 1D;
long failedLoadTopic2 = parseLongMetric(response2, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle2 =
parseLongMetric(response2, "pulsar_concurrency_load_topic_and_unload_bundle_count");
assertTrue(failedLoadTopic2 > failedLoadTopic1.get());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 == concurrencyLoadTopicAndUnloadBundle1.get());
});

// Remove the injection.
Expand All @@ -1710,6 +1719,74 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {
admin.namespaces().deleteNamespace(namespace);
}

@Test
public void testMetricsPersistentTopicLoadFailsDueToBundleUnloading() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);
admin.namespaces().unload(namespace);

// Get original counter.
final AtomicLong failedLoadTopic1 = new AtomicLong(0);
final AtomicLong concurrencyLoadTopicAndUnloadBundle1 = new AtomicLong(0);
JerseyClient httpClient = JerseyClientBuilder.createClient();
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic = parseLongMetric(response, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle =
parseLongMetric(response, "pulsar_concurrency_load_topic_and_unload_bundle_count");
failedLoadTopic1.set(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.set(concurrencyLoadTopicAndUnloadBundle);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
if (failMarker.get() && op.equals(MockZooKeeper.Op.CREATE) &&
path.startsWith("/namespace/" + namespace)) {
return true;
}
return false;
});

// Do test
try {
pulsar.getBrokerService().loadOrCreatePersistentTopic(topic, true, Collections.emptyMap(), null).join();
} catch (Exception ex) {
// ignore, because we injected an error above.
}
Awaitility.await().untilAsserted(() -> {
String response2 = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic2 = parseLongMetric(response2, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle2 =
parseLongMetric(response2, "pulsar_concurrency_load_topic_and_unload_bundle_count");
assertTrue(failedLoadTopic2 == failedLoadTopic1.get());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 > concurrencyLoadTopicAndUnloadBundle1.get());
});

// Remove the injection.
failMarker.set(false);
// cleanup.
httpClient.close();
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}

private long parseLongMetric(String metricsResponse, String metricName) {
Multimap<String, PrometheusMetricsClient.Metric> metricMap =
PrometheusMetricsClient.parseMetrics(metricsResponse);
if (!metricMap.containsKey(metricName)) {
return 0;
}
double counter = 0;
for (PrometheusMetricsClient.Metric metric :
metricMap.get(metricName)) {
counter += metric.value;
}
return Double.valueOf(counter).longValue();
}

@Test
public void testMetricsNonPersistentTopicLoadFails() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
Expand Down
Loading