diff --git a/conf/broker.conf b/conf/broker.conf index f64d08a1de88c..21893e2a5e48f 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -224,6 +224,10 @@ activeConsumerFailoverDelayTimeMillis=1000 # When it is 0, inactive subscriptions are not deleted automatically subscriptionExpirationTimeMinutes=0 +# How long to delete inactive subscriptions from last consuming for non-persistent topic. +# When it is 0, inactive subscriptions are not deleted automatically +nonPersistentSubscriptionExpirationTimeMinutes=0 + # Enable subscription message redelivery tracker to send redelivery count to consumer (default is enabled) subscriptionRedeliveryTrackerEnabled=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 4a9a6e47bf333..0598425d8c1aa 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -705,6 +705,13 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se + " When it is 0, inactive subscriptions are not deleted automatically" ) private int subscriptionExpirationTimeMinutes = 0; + @FieldContext( + category = CATEGORY_POLICIES, + doc = "How long to delete inactive subscriptions from last consuming for non-persistent topic." + + " When it is 0, inactive subscriptions are not deleted automatically", + minValue = 0 + ) + private int nonPersistentSubscriptionExpirationTimeMinutes = 0; @FieldContext( category = CATEGORY_POLICIES, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index cf46103cc357b..a9cc989050983 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1026,8 +1026,8 @@ public void checkInactiveSubscriptions() { .getPolicies(name.getNamespaceObject()) .orElseThrow(MetadataStoreException.NotFoundException::new); final int defaultExpirationTime = brokerService.pulsar().getConfiguration() - .getSubscriptionExpirationTimeMinutes(); - final Integer nsExpirationTime = policies.subscription_expiration_time_minutes; + .getNonPersistentSubscriptionExpirationTimeMinutes(); + final Integer nsExpirationTime = policies.non_persistent_subscription_expiration_time_minutes; final long expirationTimeMillis = TimeUnit.MINUTES .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); if (expirationTimeMillis > 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index 73a1084f30f2a..4bbc6c7c0bdb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -19,7 +19,9 @@ package org.apache.pulsar.broker.service.nonpersistent; import lombok.Cleanup; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -29,11 +31,16 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; +import org.awaitility.Awaitility; import org.junit.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -45,6 +52,8 @@ public class NonPersistentTopicTest extends BrokerTestBase { @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { + conf.setNonPersistentSubscriptionExpirationTimeMinutes(1); + conf.setSubscriptionExpiryCheckIntervalInMinutes(1); super.baseSetup(); } @@ -119,4 +128,32 @@ public void testCreateNonExistentPartitions() throws PulsarAdminException, Pulsa } Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); } + + + @Test + public void testInactiveSubscriptionsAutoClean() throws Exception { + String topic = "non-persistent://prop/ns-abc/testInactiveSubscriptionsAutoClean"; + admin.topics().createNonPartitionedTopic(topic); + + String subName = "test_sub"; + + try (Consumer consumer = pulsarClient + .newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName) + .subscribe()) { + // ignore + } + + PulsarService pulsar = getPulsar(); + CompletableFuture> f = pulsar.getBrokerService().getTopic(topic, false); + Optional optional = f.get(); + + if (optional.isEmpty()) { + Assert.fail(); + } + + NonPersistentTopic topic1 = (NonPersistentTopic) optional.get(); + Awaitility.waitAtMost(2, TimeUnit.MINUTES).until(() -> topic1.getSubscription(subName) == null); + } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java index 066fdf1df4f09..0ccb6c02a1439 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/Policies.java @@ -62,6 +62,8 @@ public class Policies { @SuppressWarnings("checkstyle:MemberName") public Integer subscription_expiration_time_minutes = null; @SuppressWarnings("checkstyle:MemberName") + public Integer non_persistent_subscription_expiration_time_minutes = null; + @SuppressWarnings("checkstyle:MemberName") public RetentionPolicies retention_policies = null; public boolean deleted = false; public static final String FIRST_BOUNDARY = "0x00000000"; @@ -141,7 +143,8 @@ public int hashCode() { clusterSubscribeRate, deduplicationEnabled, autoTopicCreationOverride, autoSubscriptionCreationOverride, persistence, bundles, latency_stats_sample_rate, - message_ttl_in_seconds, subscription_expiration_time_minutes, retention_policies, + message_ttl_in_seconds, subscription_expiration_time_minutes, + non_persistent_subscription_expiration_time_minutes, retention_policies, encryption_required, delayed_delivery_policies, inactive_topic_policies, subscription_auth_mode, max_producers_per_topic, @@ -180,6 +183,8 @@ public boolean equals(Object obj) { && Objects.equals(message_ttl_in_seconds, other.message_ttl_in_seconds) && Objects.equals(subscription_expiration_time_minutes, other.subscription_expiration_time_minutes) + && Objects.equals(non_persistent_subscription_expiration_time_minutes, + other.non_persistent_subscription_expiration_time_minutes) && Objects.equals(retention_policies, other.retention_policies) && Objects.equals(encryption_required, other.encryption_required) && Objects.equals(delayed_delivery_policies, other.delayed_delivery_policies)