Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Sep 24, 2024
1 parent 38e0fe9 commit 322b9cc
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -1677,16 +1679,16 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {
admin.topics().unload(topic);

// Get original counter.
MutableInt failedLoadTopic1 = new MutableInt(0);
MutableInt concurrencyLoadTopicAndUnloadBundle1 = new MutableInt(0);
AtomicLong failedLoadTopic1 = new AtomicLong(0);
AtomicLong concurrencyLoadTopicAndUnloadBundle1 = new AtomicLong(0);
JerseyClient httpClient = JerseyClientBuilder.createClient();
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic = parseLongMetric(response, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle =
parseLongMetric(response, "pulsar_concurrency_load_topic_and_unload_bundle_count");
failedLoadTopic1.setValue(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.setValue(concurrencyLoadTopicAndUnloadBundle);
failedLoadTopic1.set(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.set(concurrencyLoadTopicAndUnloadBundle);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
Expand All @@ -1706,8 +1708,8 @@ public void testMetricsPersistentTopicLoadFails() throws Exception {
long failedLoadTopic2 = parseLongMetric(response2, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle2 =
parseLongMetric(response2, "pulsar_concurrency_load_topic_and_unload_bundle_count");
assertTrue(failedLoadTopic2 > failedLoadTopic1.getValue());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 == concurrencyLoadTopicAndUnloadBundle1.getValue());
assertTrue(failedLoadTopic2 > failedLoadTopic1.get());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 == concurrencyLoadTopicAndUnloadBundle1.get());
});

// Remove the injection.
Expand All @@ -1728,16 +1730,16 @@ public void testMetricsPersistentTopicLoadFailsDueToBundleUnloading() throws Exc
admin.namespaces().unload(namespace);

// Get original counter.
MutableInt failedLoadTopic1 = new MutableInt(0);
MutableInt concurrencyLoadTopicAndUnloadBundle1 = new MutableInt(0);
AtomicLong failedLoadTopic1 = new AtomicLong(0);
AtomicLong concurrencyLoadTopicAndUnloadBundle1 = new AtomicLong(0);
JerseyClient httpClient = JerseyClientBuilder.createClient();
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
long failedLoadTopic = parseLongMetric(response, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle =
parseLongMetric(response, "pulsar_concurrency_load_topic_and_unload_bundle_count");
failedLoadTopic1.setValue(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.setValue(concurrencyLoadTopicAndUnloadBundle);
failedLoadTopic1.set(failedLoadTopic);
concurrencyLoadTopicAndUnloadBundle1.set(concurrencyLoadTopicAndUnloadBundle);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
Expand All @@ -1761,8 +1763,8 @@ public void testMetricsPersistentTopicLoadFailsDueToBundleUnloading() throws Exc
long failedLoadTopic2 = parseLongMetric(response2, "pulsar_topic_load_failed_count");
long concurrencyLoadTopicAndUnloadBundle2 =
parseLongMetric(response2, "pulsar_concurrency_load_topic_and_unload_bundle_count");
assertTrue(failedLoadTopic2 == failedLoadTopic1.getValue());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 > concurrencyLoadTopicAndUnloadBundle1.getValue());
assertTrue(failedLoadTopic2 == failedLoadTopic1.get());
assertTrue(concurrencyLoadTopicAndUnloadBundle2 > concurrencyLoadTopicAndUnloadBundle1.get());
});

// Remove the injection.
Expand Down

0 comments on commit 322b9cc

Please sign in to comment.