diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java index 0741f06df8fa9..4dbee1922cdea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java @@ -32,6 +32,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.TopicName; + /** * Provider is for MLPendingAckStore. */ @@ -47,40 +48,68 @@ public CompletableFuture newPendingAckStore(PersistentSubscript new TransactionPendingAckStoreProviderException("The subscription is null.")); return pendingAckStoreFuture; } - PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic(); String pendingAckTopicName = MLPendingAckStore .getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName()); - originPersistentTopic.getBrokerService().getManagedLedgerFactory() - .asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(), - originPersistentTopic.getManagedLedger().getConfig(), - new AsyncCallbacks.OpenLedgerCallback() { - @Override - public void openLedgerComplete(ManagedLedger ledger, Object ctx) { - ledger.asyncOpenCursor(MLPendingAckStore.getTransactionPendingAckStoreCursorName(), - InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() { - @Override - public void openCursorComplete(ManagedCursor cursor, Object ctx) { - pendingAckStoreFuture - .complete(new MLPendingAckStore(ledger, cursor, - subscription.getCursor())); - } + .asyncExists(TopicName.get(pendingAckTopicName) + .getPersistenceNamingEncoding()).thenAccept(exist -> { + TopicName topicName; + if (exist) { + topicName = TopicName.get(pendingAckTopicName); + } else { + topicName = TopicName.get(originPersistentTopic.getName()); + } + originPersistentTopic.getBrokerService() + .getManagedLedgerConfig(topicName).thenAccept(config -> { + config.setCreateIfMissing(true); + originPersistentTopic.getBrokerService().getManagedLedgerFactory() + .asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(), + config, new AsyncCallbacks.OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + ledger.asyncOpenCursor( + MLPendingAckStore.getTransactionPendingAckStoreCursorName(), + InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + pendingAckStoreFuture + .complete(new MLPendingAckStore(ledger, cursor, + subscription.getCursor())); + if (log.isDebugEnabled()) { + log.debug("{},{} open MLPendingAckStore cursor success", + originPersistentTopic.getName(), + subscription.getName()); + } + } - @Override - public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - log.error("Open MLPendingAckStore cursor failed.", exception); - pendingAckStoreFuture.completeExceptionally(exception); - } - }, null); - } + @Override + public void openCursorFailed(ManagedLedgerException exception, + Object ctx) { + log.error("{},{} open MLPendingAckStore cursor failed." + , originPersistentTopic.getName(), + subscription.getName(), exception); + pendingAckStoreFuture.completeExceptionally(exception); + } + }, null); + } - @Override - public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - log.error("Open MLPendingAckStore managedLedger failed.", exception); - pendingAckStoreFuture.completeExceptionally(exception); - } - }, () -> true, null); + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + log.error("{}, {} open MLPendingAckStore managedLedger failed." + , originPersistentTopic.getName(), subscription.getName(), exception); + pendingAckStoreFuture.completeExceptionally(exception); + } + }, () -> true, null); + }); + }).exceptionally(e -> { + log.error("Failed to obtain the existence of ManagerLedger with topic and subscription : " + + originPersistentTopic.getSubscriptions() + " " + + subscription.getName()); + pendingAckStoreFuture.completeExceptionally( + e.getCause()); + return null; + }); return pendingAckStoreFuture; } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index f8fc31ec5fd83..fb94638bf49c1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -19,18 +19,26 @@ package org.apache.pulsar.broker.transaction; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertNotNull; import com.google.common.collect.Sets; +import java.lang.reflect.Field; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; +import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; @@ -40,12 +48,10 @@ import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; -import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; -import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; -import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; -import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterMethod; @@ -167,4 +173,77 @@ public void testGetTxnID() throws Exception { Assert.assertEquals(txnID.getLeastSigBits(), 1); Assert.assertEquals(txnID.getMostSigBits(), 0); } + + @Test + public void testSubscriptionRecreateTopic() + throws PulsarAdminException, NoSuchFieldException, IllegalAccessException, PulsarClientException { + String topic = "persistent://pulsar/system/testReCreateTopic"; + String subName = "sub_testReCreateTopic"; + int retentionSizeInMbSetTo = 5; + int retentionSizeInMbSetTopic = 6; + int retentionSizeInMinutesSetTo = 5; + int retentionSizeInMinutesSetTopic = 6; + admin.topics().createNonPartitionedTopic(topic); + PulsarService pulsarService = super.getPulsarServiceList().get(0); + pulsarService.getBrokerService().getTopics().clear(); + ManagedLedgerFactory managedLedgerFactory = pulsarService.getBrokerService().getManagedLedgerFactory(); + Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); + field.setAccessible(true); + ConcurrentHashMap> ledgers = + (ConcurrentHashMap>) field.get(managedLedgerFactory); + ledgers.remove(TopicName.get(topic).getPersistenceNamingEncoding()); + try { + admin.topics().createNonPartitionedTopic(topic); + Assert.fail(); + } catch (PulsarAdminException.ConflictException e) { + log.info("Cann`t create topic again"); + } + admin.topics().setRetention(topic, + new RetentionPolicies(retentionSizeInMinutesSetTopic, retentionSizeInMbSetTopic)); + pulsarClient.newConsumer().topic(topic) + .subscriptionName(subName) + .subscribe(); + pulsarService.getBrokerService().getTopicIfExists(topic).thenAccept(option -> { + if (!option.isPresent()) { + log.error("Failed o get Topic named: {}", topic); + Assert.fail(); + } + PersistentTopic originPersistentTopic = (PersistentTopic) option.get(); + String pendingAckTopicName = MLPendingAckStore + .getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subName); + + try { + admin.topics().setRetention(pendingAckTopicName, + new RetentionPolicies(retentionSizeInMinutesSetTo, retentionSizeInMbSetTo)); + } catch (PulsarAdminException e) { + log.error("Failed to get./setRetention of topic with Exception:" + e); + Assert.fail(); + } + PersistentSubscription subscription = originPersistentTopic + .getSubscription(subName); + subscription.getPendingAckManageLedger().thenAccept(managedLedger -> { + long retentionSize = managedLedger.getConfig().getRetentionSizeInMB(); + if (!originPersistentTopic.getTopicPolicies().isPresent()) { + log.error("Failed to getTopicPolicies of :" + originPersistentTopic); + Assert.fail(); + } + TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get(); + Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize); + MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider(); + CompletableFuture future = mlPendingAckStoreProvider.newPendingAckStore(subscription); + future.thenAccept(pendingAckStore -> { + ((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> { + Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(), + retentionSizeInMbSetTo); + }); + } + ); + }); + + + }); + + + } + } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index d2e0af4bf7991..f40f80fb12b96 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -134,6 +134,7 @@ protected void startBroker() throws Exception { conf.setSystemTopicEnabled(true); conf.setTransactionBufferSnapshotMaxTransactionCount(2); conf.setTransactionBufferSnapshotMinTimeInMillis(2000); + conf.setTopicLevelPoliciesEnabled(true); serviceConfigurationList.add(conf); PulsarService pulsar = spy(new PulsarService(conf));