diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index 4e103d09a2fad..cfc1d3db8d36b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import org.apache.bookkeeper.common.annotation.InterfaceAudience; @@ -160,4 +161,14 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M */ void shutdown() throws InterruptedException, ManagedLedgerException; + /** + * Check managed ledger has been initialized before. + * + * @param ledgerName {@link String} + * @return a future represents the result of the operation. + * an instance of {@link Boolean} is returned + * if the operation succeeds. + */ + CompletableFuture asyncExists(String ledgerName); + } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 03eab6f86f191..5fcf370b4d9c8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -522,6 +522,11 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) { entryCacheManager.clear(); } + @Override + public CompletableFuture asyncExists(String ledgerName) { + return store.asyncExists(ledgerName); + } + @Override public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException { class Result { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index 8b99203c91a57..9f1563be61863 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.mledger.impl; import java.util.List; +import java.util.concurrent.CompletableFuture; + import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; @@ -129,4 +131,14 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn * @throws MetaStoreException */ Iterable getManagedLedgers() throws MetaStoreException; + + /** + * Check ledger exists. + * + * @param ledgerName {@link String} + * @return a future represents the result of the operation. + * an instance of {@link Boolean} is returned + * if the operation succeeds. + */ + CompletableFuture asyncExists(String ledgerName); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index e06c9f15611e4..b21b41ec92c21 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import io.netty.buffer.ByteBuf; @@ -257,6 +258,11 @@ public Iterable getManagedLedgers() throws MetaStoreException { } } + @Override + public CompletableFuture asyncExists(String path) { + return store.exists(PREFIX + path); + } + // // update timestamp if missing or 0 // 3 cases - timestamp does not exist for ledgers serialized before diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 012f48a3822a6..9a1232ff08b21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1998,43 +1998,68 @@ protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) { final String topic = command.getTopic(); final int txnAction = command.getTxnAction().getValue(); TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); + final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark(); if (log.isDebugEnabled()) { log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", topic, txnID, txnAction); } - CompletableFuture> topicFuture = service.getTopics().get(TopicName.get(topic).toString()); - if (topicFuture != null) { - topicFuture.whenComplete((optionalTopic, t) -> { - if (!optionalTopic.isPresent()) { - log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, " - + "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction)); - ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( - requestId, ServerError.ServiceNotReady, - "Topic " + topic + " is not found.")); - return; - } - optionalTopic.get().endTxn(txnID, txnAction, command.getTxnidLeastBitsOfLowWatermark()) + CompletableFuture> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); + topicFuture.thenAccept(optionalTopic -> { + if (optionalTopic.isPresent()) { + optionalTopic.get().endTxn(txnID, txnAction, lowWaterMark) .whenComplete((ignored, throwable) -> { if (throwable != null) { - log.error("Handle endTxnOnPartition {} failed.", topic, throwable); + log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], " + + "txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction), throwable); ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( requestId, BrokerServiceException.getClientErrorCode(throwable), - throwable.getMessage())); + throwable.getMessage(), + txnID.getLeastSigBits(), txnID.getMostSigBits())); return; } ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits())); }); - }); - } else { - log.error("handleEndTxnOnPartition faile ! The topic {} does not exist in broker, " - + "txnId: [{}], txnAction: [{}]", topic, txnID, TxnAction.valueOf(txnAction)); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), - ServerError.ServiceNotReady, - "The topic " + topic + " is not exist in broker.")); - } + + } else { + getBrokerService().getManagedLedgerFactory() + .asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()) + .thenAccept((b) -> { + if (b) { + log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, " + + "txnId: [{}], txnAction: [{}]", topic, + txnID, TxnAction.valueOf(txnAction)); + ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, + ServerError.ServiceNotReady, + "The topic " + topic + " does not exist in broker.", + txnID.getMostSigBits(), txnID.getLeastSigBits())); + } else { + log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, " + + "txnId: [{}], txnAction: [{}]", + topic, txnID, TxnAction.valueOf(txnAction)); + ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, + txnID.getLeastSigBits(), txnID.getMostSigBits())); + } + }).exceptionally(e -> { + log.error("handleEndTxnOnPartition fail ! topic {} , " + + "txnId: [{}], txnAction: [{}]", topic, txnID, + TxnAction.valueOf(txnAction), e.getCause()); + ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( + requestId, ServerError.ServiceNotReady, + e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits())); + return null; + }); + } + }).exceptionally(e -> { + log.error("handleEndTxnOnPartition fail ! topic {} , " + + "txnId: [{}], txnAction: [{}]", topic, txnID, + TxnAction.valueOf(txnAction), e.getCause()); + ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse( + requestId, ServerError.ServiceNotReady, + e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits())); + return null; + }); } @Override @@ -2045,70 +2070,82 @@ protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) { final String topic = command.getSubscription().getTopic(); final String subName = command.getSubscription().getSubscription(); final int txnAction = command.getTxnAction().getValue(); + final TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits); + final long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark(); if (log.isDebugEnabled()) { - log.debug("[{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic, + log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", topic, subName, new TxnID(txnidMostBits, txnidLeastBits), txnAction); } - CompletableFuture> topicFuture = service.getTopics().get(TopicName.get(topic).toString()); - if (topicFuture != null) { - topicFuture.thenAccept(optionalTopic -> { - - if (!optionalTopic.isPresent()) { - log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: " - + "[{}], txnAction: [{}]", topic, - new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnidLeastBits, txnidMostBits, - ServerError.ServiceNotReady, - "The topic " + topic + " is not exist in broker.")); - return; - } - + CompletableFuture> topicFuture = service.getTopicIfExists(TopicName.get(topic).toString()); + topicFuture.thenAccept(optionalTopic -> { + if (optionalTopic.isPresent()) { Subscription subscription = optionalTopic.get().getSubscription(subName); if (subscription == null) { - log.error("Topic {} subscription {} is not exist.", optionalTopic.get().getName(), subName); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnidLeastBits, txnidMostBits, - ServerError.ServiceNotReady, - "Topic " + optionalTopic.get().getName() - + " subscription " + subName + " is not exist.")); + log.warn("handleEndTxnOnSubscription fail! " + + "topic {} subscription {} does not exist. txnId: [{}], txnAction: [{}]", + optionalTopic.get().getName(), subName, txnID, TxnAction.valueOf(txnAction)); + ctx.writeAndFlush( + Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); return; } CompletableFuture completableFuture = - subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, - command.getTxnidLeastBitsOfLowWatermark()); - completableFuture.whenComplete((ignored, throwable) -> { - if (throwable != null) { - log.error("Handle end txn on subscription failed for request {}", requestId, throwable); + subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark); + completableFuture.whenComplete((ignored, e) -> { + if (e != null) { + log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}" + + "txnId: [{}], txnAction: [{}]", topic, subName, + txnID, TxnAction.valueOf(txnAction), e.getCause()); ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( requestId, txnidLeastBits, txnidMostBits, - BrokerServiceException.getClientErrorCode(throwable), + BrokerServiceException.getClientErrorCode(e), "Handle end txn on subscription failed.")); return; } ctx.writeAndFlush( Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits)); }); - }).exceptionally(e -> { - log.error("Handle end txn on subscription failed for request {}", requestId, e); - ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( - requestId, txnidLeastBits, txnidMostBits, - ServerError.ServiceNotReady, - "Handle end txn on subscription failed.")); - return null; - }); - } else { - log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, txnId: " - + "[{}], txnAction: [{}]", topic, - new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); + } else { + getBrokerService().getManagedLedgerFactory() + .asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()) + .thenAccept((b) -> { + if (b) { + log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, " + + "subscription: {} ,txnId: [{}], txnAction: [{}]", topic, subName, + new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)); + ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( + requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), + ServerError.ServiceNotReady, + "The topic " + topic + " does not exist in broker.")); + } else { + log.warn("handleEndTxnOnSubscription fail ! The topic {} has not been created, " + + "subscription: {} txnId: [{}], txnAction: [{}]", + topic, subName, txnID, TxnAction.valueOf(txnAction)); + ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, + txnID.getLeastSigBits(), txnID.getMostSigBits())); + } + }).exceptionally(e -> { + log.error("handleEndTxnOnSubscription fail ! topic {} , subscription: {}" + + "txnId: [{}], txnAction: [{}]", topic, subName, + txnID, TxnAction.valueOf(txnAction), e.getCause()); + ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( + requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), + ServerError.ServiceNotReady, e.getMessage())); + return null; + }); + } + }).exceptionally(e -> { + log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}" + + "txnId: [{}], txnAction: [{}]", topic, subName, + txnID, TxnAction.valueOf(txnAction), e.getCause()); ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse( requestId, txnidLeastBits, txnidMostBits, ServerError.ServiceNotReady, - "The topic " + topic + " is not exist in broker.")); - } + "Handle end txn on subscription failed.")); + return null; + }); } private CompletableFuture tryAddSchema(Topic topic, SchemaData schema) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index 635bddd414d0f..d2e0af4bf7991 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -21,8 +21,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import static org.testng.AssertJUnit.assertEquals; -import static org.testng.AssertJUnit.assertNotNull; import com.google.common.util.concurrent.MoreExecutors; import io.netty.channel.EventLoopGroup; import java.util.ArrayList; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java index ba73034cb9cf9..08de910ad4b2e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java @@ -18,16 +18,10 @@ */ package org.apache.pulsar.broker.transaction.buffer; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doReturn; - import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; import lombok.Cleanup; @@ -37,15 +31,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.pulsar.broker.intercept.MockBrokerInterceptor; -import org.apache.pulsar.broker.service.BrokerService; -import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.transaction.TransactionTestBase; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferHandlerImpl; -import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TransactionBufferClient; @@ -56,13 +47,12 @@ import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; -import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.lang.reflect.Field; import java.util.concurrent.ConcurrentSkipListMap; @@ -74,76 +64,38 @@ import static org.testng.Assert.fail; @Test(groups = "broker") -public class TransactionBufferClientTest extends TransactionMetaStoreTestBase { +public class TransactionBufferClientTest extends TransactionTestBase { private static final Logger log = LoggerFactory.getLogger(TransactionBufferClientTest.class); private TransactionBufferClient tbClient; TopicName partitionedTopicName = TopicName.get("persistent", "public", "test", "tb-client"); int partitions = 10; - BrokerService[] brokerServices; private static final String namespace = "public/test"; - private EventLoopGroup eventLoopGroup; - @Override - protected void afterSetup() throws Exception { - pulsarAdmins[0].clusters().createCluster("my-cluster", ClusterData.builder().serviceUrl(pulsarServices[0].getWebServiceAddress()).build()); - pulsarAdmins[0].tenants().createTenant("public", new TenantInfoImpl(Sets.newHashSet(), Sets.newHashSet("my-cluster"))); - pulsarAdmins[0].namespaces().createNamespace(namespace, 10); - pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions); - String subName = "test"; - pulsarAdmins[0].topics().createSubscription(partitionedTopicName.getPartitionedTopicName(), subName, MessageId.latest); + @BeforeClass(alwaysRun = true) + protected void setup() throws Exception { + setBrokerCount(3); + internalSetup(); + String[] brokerServiceUrlArr = getPulsarServiceList().get(0).getBrokerServiceUrl().split(":"); + String webServicePort = brokerServiceUrlArr[brokerServiceUrlArr.length -1]; + admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder().serviceUrl("http://localhost:" + webServicePort).build()); + admin.tenants().createTenant("public", + new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(namespace, 10); + admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions); pulsarClient.newConsumer() .topic(partitionedTopicName.getPartitionedTopicName()) - .subscriptionName(subName).subscribe(); - tbClient = TransactionBufferClientImpl.create( - ((PulsarClientImpl) pulsarClient), + .subscriptionName("test").subscribe(); + tbClient = TransactionBufferClientImpl.create(pulsarClient, new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer"))); } @Override + @AfterClass(alwaysRun = true) protected void cleanup() throws Exception { - if (tbClient != null) { - tbClient.close(); - } - if (brokerServices != null) { - for (BrokerService bs : brokerServices) { - bs.close(); - } - brokerServices = null; - } - super.cleanup(); - eventLoopGroup.shutdownGracefully().get(); - } - - @Override - protected void afterPulsarStart() throws Exception { - eventLoopGroup = new NioEventLoopGroup(); - brokerServices = new BrokerService[pulsarServices.length]; - AtomicLong atomicLong = new AtomicLong(0); - for (int i = 0; i < pulsarServices.length; i++) { - Subscription mockSubscription = mock(Subscription.class); - Mockito.when(mockSubscription.endTxn(Mockito.anyLong(), - Mockito.anyLong(), Mockito.anyInt(), Mockito.anyLong())) - .thenReturn(CompletableFuture.completedFuture(null)); - - Topic mockTopic = mock(Topic.class); - Mockito.when(mockTopic.endTxn(any(), Mockito.anyInt(), anyLong())) - .thenReturn(CompletableFuture.completedFuture(null)); - Mockito.when(mockTopic.getSubscription(any())).thenReturn(mockSubscription); - - ConcurrentOpenHashMap>> topicMap = - mock(ConcurrentOpenHashMap.class); - Mockito.when(topicMap.get(Mockito.anyString())).thenReturn( - CompletableFuture.completedFuture(Optional.of(mockTopic))); - - BrokerService brokerService = Mockito.spy(new BrokerService(pulsarServices[i], eventLoopGroup)); - doReturn(new MockBrokerInterceptor()).when(brokerService).getInterceptor(); - doReturn(atomicLong.getAndIncrement() + "").when(brokerService).generateUniqueProducerName(); - brokerServices[i] = brokerService; - Mockito.when(brokerService.getTopics()).thenReturn(topicMap); - Mockito.when(pulsarServices[i].getBrokerService()).thenReturn(brokerService); - } + tbClient.close(); + super.internalCleanup(); } @Test @@ -198,43 +150,6 @@ public void testAbortOnSubscription() throws ExecutionException, InterruptedExce } } - @Test - public void testTransactionBufferOpFail() throws InterruptedException, ExecutionException { - ConcurrentOpenHashMap>>[] originalMaps = - new ConcurrentOpenHashMap[brokerServices.length]; - ConcurrentOpenHashMap>> topicMap = new ConcurrentOpenHashMap<>(); - for (int i = 0; i < brokerServices.length; i++) { - originalMaps[i] = brokerServices[i].getTopics(); - when(brokerServices[i].getTopics()).thenReturn(topicMap); - } - - try { - tbClient.abortTxnOnSubscription( - partitionedTopicName.getPartition(0).toString(), "test", 1L, 1, -1L).get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof PulsarClientException.LookupException); - } - - try { - tbClient.abortTxnOnTopic( - partitionedTopicName.getPartition(0).toString(), 1L, 1, -1L).get(); - fail(); - } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof PulsarClientException.LookupException); - } - - for (int i = 0; i < brokerServices.length; i++) { - when(brokerServices[i].getTopics()).thenReturn(originalMaps[i]); - } - - tbClient.abortTxnOnSubscription( - partitionedTopicName.getPartition(0).toString(), "test", 1L, 1, -1L).get(); - - tbClient.abortTxnOnTopic( - partitionedTopicName.getPartition(0).toString(), 1L, 1, -1L).get(); - } - @Test public void testTransactionBufferClientTimeout() throws Exception { PulsarClientImpl mockClient = mock(PulsarClientImpl.class); @@ -314,12 +229,27 @@ public void testTransactionBufferChannelUnActive() { } @Test - public void testTransactionBufferLookUp() throws ExecutionException, InterruptedException { + public void testTransactionBufferLookUp() throws Exception { String topic = "persistent://" + namespace + "/testTransactionBufferLookUp"; - tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get(); - tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get(); - tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get(); - tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get(); + String subName = "test"; + + String abortTopic = topic + "_abort_sub"; + String commitTopic = topic + "_commit_sub"; + admin.topics().createNonPartitionedTopic(abortTopic); + admin.topics().createSubscription(abortTopic, subName, MessageId.earliest); + + admin.topics().createNonPartitionedTopic(commitTopic); + admin.topics().createSubscription(commitTopic, subName, MessageId.earliest); + + waitPendingAckInit(abortTopic, subName); + waitPendingAckInit(commitTopic, subName); + + tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get(); + + tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get(); + + tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get(); + tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get(); } @Test @@ -333,10 +263,69 @@ public void testTransactionBufferHandlerSemaphore() throws Exception { field.setAccessible(true); field.set(transactionBufferHandler, new Semaphore(2)); - String topic = "persistent://" + namespace + "/testTransactionBufferLookUp"; - tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get(); + + String topic = "persistent://" + namespace + "/testTransactionBufferHandlerSemaphore"; + String subName = "test"; + + String abortTopic = topic + "_abort_sub"; + String commitTopic = topic + "_commit_sub"; + + admin.topics().createNonPartitionedTopic(abortTopic); + admin.topics().createSubscription(abortTopic, subName, MessageId.earliest); + + admin.topics().createNonPartitionedTopic(commitTopic); + admin.topics().createSubscription(commitTopic, subName, MessageId.earliest); + + waitPendingAckInit(abortTopic, subName); + waitPendingAckInit(commitTopic, subName); + tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get(); + + tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get(); + + tbClient.abortTxnOnTopic(abortTopic, 1L, 1L, -1L).get(); + tbClient.commitTxnOnTopic(commitTopic, 1L, 1L, -1L).get(); + } + + @Test + public void testEndTopicNotExist() throws Exception { + String topic = "persistent://" + namespace + "/testEndTopicNotExist"; + String sub = "test"; + tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get(); - tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get(); tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get(); + + tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, -1L).get(); + tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, -1L).get(); + } + + @Test + public void testEndSubNotExist() throws Exception { + + String topic = "persistent://" + namespace + "/testEndTopicNotExist"; + String sub = "test"; + admin.topics().createNonPartitionedTopic(topic + "_abort_sub"); + + admin.topics().createNonPartitionedTopic(topic + "_commit_sub"); + + tbClient.abortTxnOnSubscription(topic + "_abort_topic", sub, 1L, 1L, -1L).get(); + tbClient.abortTxnOnSubscription(topic + "_commit_topic", sub, 1L, 1L, -1L).get(); + } + + private void waitPendingAckInit(String topic, String sub) throws Exception { + + boolean exist = false; + for (int i = 0; i < getPulsarServiceList().size(); i++) { + CompletableFuture> completableFuture = getPulsarServiceList().get(i) + .getBrokerService().getTopics().get(topic); + if (completableFuture != null) { + PersistentSubscription persistentSubscription = + (PersistentSubscription) completableFuture.get().get().getSubscription(sub); + Awaitility.await().untilAsserted(() -> + assertEquals(persistentSubscription.getTransactionPendingAckStats().state, "Ready")); + exist = true; + } + } + + assertTrue(exist); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index cf997baf94896..4484ab94a0799 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1314,10 +1314,13 @@ public static ByteBuf newEndTxnOnPartitionResponse(long requestId, long txnIdLea return serializeWithSize(cmd); } - public static ByteBuf newEndTxnOnPartitionResponse(long requestId, ServerError error, String errorMsg) { + public static ByteBuf newEndTxnOnPartitionResponse(long requestId, ServerError error, String errorMsg, + long txnIdLeastBits, long txnIdMostBits) { BaseCommand cmd = localCmd(Type.END_TXN_ON_PARTITION_RESPONSE); CommandEndTxnOnPartitionResponse response = cmd.setEndTxnOnPartitionResponse() .setRequestId(requestId) + .setTxnidMostBits(txnIdMostBits) + .setTxnidLeastBits(txnIdLeastBits) .setError(error); if (errorMsg != null) { response.setMessage(errorMsg);