Skip to content

Commit

Permalink
Pending ack set managed ledger config true (#11494)
Browse files Browse the repository at this point in the history
## Modivation
Fix the Issue of #11481
In standalone mode, pulsar 2.8.0 cannot be used normally when the transaction is started

## CauseBy
```getTopic```was executed  twice when FunctionWorkService .
```getTopicIfExists```make ```createIfMissing = false``` When the execution ends.
```PersistentSubscription```  will create a ledger for the subscription  when transaction was turned on. 
```new MetadataNotFoundException("Managed ledger not found")```was thrown when calling ```MetaStoreImpl::getManagedLedgerInfo``` 
## implement   
Create a separate ManagerLedgerConfig for PendingAck
## verify 
Add testSubscriptionRecreateTopic in TransactionTest
  • Loading branch information
liangyepianzhou authored Aug 10, 2021
1 parent bfae8f6 commit daf457d
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.TopicName;


/**
* Provider is for MLPendingAckStore.
*/
Expand All @@ -47,40 +48,68 @@ public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscript
new TransactionPendingAckStoreProviderException("The subscription is null."));
return pendingAckStoreFuture;
}

PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic();
String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());

originPersistentTopic.getBrokerService().getManagedLedgerFactory()
.asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(),
originPersistentTopic.getManagedLedger().getConfig(),
new AsyncCallbacks.OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
ledger.asyncOpenCursor(MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
pendingAckStoreFuture
.complete(new MLPendingAckStore(ledger, cursor,
subscription.getCursor()));
}
.asyncExists(TopicName.get(pendingAckTopicName)
.getPersistenceNamingEncoding()).thenAccept(exist -> {
TopicName topicName;
if (exist) {
topicName = TopicName.get(pendingAckTopicName);
} else {
topicName = TopicName.get(originPersistentTopic.getName());
}
originPersistentTopic.getBrokerService()
.getManagedLedgerConfig(topicName).thenAccept(config -> {
config.setCreateIfMissing(true);
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
.asyncOpen(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding(),
config, new AsyncCallbacks.OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
ledger.asyncOpenCursor(
MLPendingAckStore.getTransactionPendingAckStoreCursorName(),
InitialPosition.Earliest, new AsyncCallbacks.OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
pendingAckStoreFuture
.complete(new MLPendingAckStore(ledger, cursor,
subscription.getCursor()));
if (log.isDebugEnabled()) {
log.debug("{},{} open MLPendingAckStore cursor success",
originPersistentTopic.getName(),
subscription.getName());
}
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("Open MLPendingAckStore cursor failed.", exception);
pendingAckStoreFuture.completeExceptionally(exception);
}
}, null);
}
@Override
public void openCursorFailed(ManagedLedgerException exception,
Object ctx) {
log.error("{},{} open MLPendingAckStore cursor failed."
, originPersistentTopic.getName(),
subscription.getName(), exception);
pendingAckStoreFuture.completeExceptionally(exception);
}
}, null);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
log.error("Open MLPendingAckStore managedLedger failed.", exception);
pendingAckStoreFuture.completeExceptionally(exception);
}
}, () -> true, null);
@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
log.error("{}, {} open MLPendingAckStore managedLedger failed."
, originPersistentTopic.getName(), subscription.getName(), exception);
pendingAckStoreFuture.completeExceptionally(exception);
}
}, () -> true, null);
});
}).exceptionally(e -> {
log.error("Failed to obtain the existence of ManagerLedger with topic and subscription : "
+ originPersistentTopic.getSubscriptions() + " "
+ subscription.getName());
pendingAckStoreFuture.completeExceptionally(
e.getCause());
return null;
});
return pendingAckStoreFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@
package org.apache.pulsar.broker.transaction;

import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertNotNull;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
Expand All @@ -40,12 +48,10 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -167,4 +173,77 @@ public void testGetTxnID() throws Exception {
Assert.assertEquals(txnID.getLeastSigBits(), 1);
Assert.assertEquals(txnID.getMostSigBits(), 0);
}

@Test
public void testSubscriptionRecreateTopic()
throws PulsarAdminException, NoSuchFieldException, IllegalAccessException, PulsarClientException {
String topic = "persistent://pulsar/system/testReCreateTopic";
String subName = "sub_testReCreateTopic";
int retentionSizeInMbSetTo = 5;
int retentionSizeInMbSetTopic = 6;
int retentionSizeInMinutesSetTo = 5;
int retentionSizeInMinutesSetTopic = 6;
admin.topics().createNonPartitionedTopic(topic);
PulsarService pulsarService = super.getPulsarServiceList().get(0);
pulsarService.getBrokerService().getTopics().clear();
ManagedLedgerFactory managedLedgerFactory = pulsarService.getBrokerService().getManagedLedgerFactory();
Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers =
(ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>>) field.get(managedLedgerFactory);
ledgers.remove(TopicName.get(topic).getPersistenceNamingEncoding());
try {
admin.topics().createNonPartitionedTopic(topic);
Assert.fail();
} catch (PulsarAdminException.ConflictException e) {
log.info("Cann`t create topic again");
}
admin.topics().setRetention(topic,
new RetentionPolicies(retentionSizeInMinutesSetTopic, retentionSizeInMbSetTopic));
pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscribe();
pulsarService.getBrokerService().getTopicIfExists(topic).thenAccept(option -> {
if (!option.isPresent()) {
log.error("Failed o get Topic named: {}", topic);
Assert.fail();
}
PersistentTopic originPersistentTopic = (PersistentTopic) option.get();
String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subName);

try {
admin.topics().setRetention(pendingAckTopicName,
new RetentionPolicies(retentionSizeInMinutesSetTo, retentionSizeInMbSetTo));
} catch (PulsarAdminException e) {
log.error("Failed to get./setRetention of topic with Exception:" + e);
Assert.fail();
}
PersistentSubscription subscription = originPersistentTopic
.getSubscription(subName);
subscription.getPendingAckManageLedger().thenAccept(managedLedger -> {
long retentionSize = managedLedger.getConfig().getRetentionSizeInMB();
if (!originPersistentTopic.getTopicPolicies().isPresent()) {
log.error("Failed to getTopicPolicies of :" + originPersistentTopic);
Assert.fail();
}
TopicPolicies topicPolicies = originPersistentTopic.getTopicPolicies().get();
Assert.assertEquals(retentionSizeInMbSetTopic, retentionSize);
MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider();
CompletableFuture<PendingAckStore> future = mlPendingAckStoreProvider.newPendingAckStore(subscription);
future.thenAccept(pendingAckStore -> {
((MLPendingAckStore) pendingAckStore).getManagedLedger().thenAccept(managedLedger1 -> {
Assert.assertEquals(managedLedger1.getConfig().getRetentionSizeInMB(),
retentionSizeInMbSetTo);
});
}
);
});


});


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ protected void startBroker() throws Exception {
conf.setSystemTopicEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(2);
conf.setTransactionBufferSnapshotMinTimeInMillis(2000);
conf.setTopicLevelPoliciesEnabled(true);
serviceConfigurationList.add(conf);

PulsarService pulsar = spy(new PulsarService(conf));
Expand Down

0 comments on commit daf457d

Please sign in to comment.