From 576eb7000ee1c414058eba95eab2aa334d1f0bdb Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 18 May 2023 18:14:04 +0800 Subject: [PATCH 01/13] add proto. --- .../broker/service/BrokerServiceException.java | 2 ++ pulsar-common/src/main/proto/PulsarApi.proto | 4 ++++ .../exceptions/CoordinatorException.java | 13 +++++++++++++ .../src/main/proto/PulsarTransactionMetadata.proto | 3 ++- 4 files changed, 21 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java index 3e77588b2459f..fa35c717f1f88 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java @@ -277,6 +277,8 @@ private static ServerError getClientErrorCode(Throwable t, boolean checkCauseIfU return ServerError.TransactionConflict; } else if (t instanceof CoordinatorException.TransactionNotFoundException) { return ServerError.TransactionNotFound; + } else if (t instanceof CoordinatorException.PreserverClosedException) { + return ServerError.TransactionPreserverClosed; } else { if (checkCauseIfUnknown) { return getClientErrorCode(t.getCause(), false); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index afe193eeb7e9d..a7c441b5f406c 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -229,6 +229,7 @@ enum ServerError { // use this error to indicate that this producer is now permanently // fenced. Applications are now supposed to close it and create a // new producer + TransactionPreserverClosed = 26; // Transaction metadata preserver is closed } enum AuthMethod { @@ -265,6 +266,7 @@ enum ProtocolVersion { v19 = 19; // Add CommandTcClientConnectRequest and CommandTcClientConnectResponse v20 = 20; // Add client support for topic migration redirection CommandTopicMigrated v21 = 21; // Carry the AUTO_CONSUME schema to the Broker after this version + v22 = 22; // Add support for client name in CommandNewTxn and CommandEndTxn } message CommandConnect { @@ -860,6 +862,7 @@ message CommandNewTxn { required uint64 request_id = 1; optional uint64 txn_ttl_seconds = 2 [default = 0]; optional uint64 tc_id = 3 [default = 0]; + optional string client_name = 4; } message CommandNewTxnResponse { @@ -909,6 +912,7 @@ message CommandEndTxn { optional uint64 txnid_least_bits = 2 [default = 0]; optional uint64 txnid_most_bits = 3 [default = 0]; optional TxnAction txn_action = 4; + optional string client_name = 5; } message CommandEndTxnResponse { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java index 2b15b957858fc..6682931f302a4 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/exceptions/CoordinatorException.java @@ -131,4 +131,17 @@ public ReachMaxActiveTxnException(String message) { super(message); } } + + + /** + * Exception is thrown when the transaction preserver is closed. + */ + public static class PreserverClosedException extends CoordinatorException { + + private static final long serialVersionUID = 0L; + + public PreserverClosedException(String message) { + super(message); + } + } } diff --git a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto index 134d1cf3b51a1..fff4cb8a39149 100644 --- a/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto +++ b/pulsar-transaction/coordinator/src/main/proto/PulsarTransactionMetadata.proto @@ -51,7 +51,8 @@ message TransactionMetadataEntry { optional uint64 start_time = 9; optional uint64 last_modification_time = 10; optional uint64 max_local_txn_id = 11; - optional string owner = 12; + optional string owner = 12; + optional string clientName = 13; } message BatchedTransactionMetadataEntry{ From b106ef12bc443cd9a4a57c8e0b6d4dccc6f79160 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 18 May 2023 18:26:34 +0800 Subject: [PATCH 02/13] add preserver. --- .../pulsar/broker/ServiceConfiguration.java | 22 ++ .../TransactionRecoverTrackerImpl.java | 28 +- .../TerminatedTransactionMetadataEntry.java | 35 ++ .../TransactionMetadataPreserver.java | 80 +++++ .../transaction/coordinator/TxnMeta.java | 27 +- .../MLTransactionMetadataPreserverImpl.java | 307 ++++++++++++++++++ .../coordinator/impl/TxnMetaImpl.java | 56 +++- 7 files changed, 534 insertions(+), 21 deletions(-) create mode 100644 pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java create mode 100644 pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java create mode 100644 pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 9966912bc8eae..1fc9424c2af32 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2994,6 +2994,28 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, ) private boolean transactionCoordinatorEnabled = false; + @FieldContext( + category = CATEGORY_TRANSACTION, + doc = "Max number of txnMeta of aborted transaction to persist in broker." + + "If the number of aborted transaction is greater than this value, the oldest aborted transaction will be " + + "removed from the cache and persisted in the store." + + "default value is 0, disable persistence of aborted transaction." + ) + private int TransactionMetaPersistCount = 0; + + @FieldContext( + category = CATEGORY_TRANSACTION, + doc = "Time in hour to persist the transaction metadata in TransactionMetadataPreserver." + ) + private long TransactionMetaPersistTimeInHour = 72; + + @FieldContext( + category = CATEGORY_TRANSACTION, + doc = "Interval in seconds to check the expired transaction in TransactionMetadataPreserver." + ) + private long TransactionMetaExpireCheckIntervalInSecond = 300; + + @FieldContext( category = CATEGORY_TRANSACTION, doc = "Class name for transaction metadata store provider" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java index 409c6b314c7e3..7b22bc1eeb2bb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java @@ -117,17 +117,33 @@ public void handleOpenStatusTransaction(long sequenceId, long timeout) { @Override public void appendOpenTransactionToTimeoutTracker() { + if (log.isDebugEnabled()) { + log.debug("Append open transaction to timeout tracker, tcId: {}, openTransactions: {}", + tcId, openTransactions); + } openTransactions.forEach(timeoutTracker::replayAddTransaction); } @Override public void handleCommittingAndAbortingTransaction() { - committingTransactions.forEach(k -> - transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.COMMIT_VALUE, - false)); + if(log.isDebugEnabled()) { + log.debug("Handle committing and aborting transaction, tcId: {}, committingTransactions: {}, " + + "abortingTransactions: {}", tcId, committingTransactions, abortingTransactions); + } + committingTransactions.forEach(k -> { + TxnID txnID = new TxnID(tcId, k); + transactionMetadataStoreService.getTxnMeta(txnID) + .thenAccept(txnMeta -> + transactionMetadataStoreService.endTransaction(txnID, TxnAction.COMMIT_VALUE, + false, txnMeta.getClientName())); + }); - abortingTransactions.forEach(k -> - transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.ABORT_VALUE, - false)); + abortingTransactions.forEach(k -> { + TxnID txnID = new TxnID(tcId, k); + transactionMetadataStoreService.getTxnMeta(txnID) + .thenAccept(txnMeta -> + transactionMetadataStoreService.endTransaction(txnID, TxnAction.ABORT_VALUE, + false, txnMeta.getClientName())); + }); } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java new file mode 100644 index 0000000000000..3680f090b8d50 --- /dev/null +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.transaction.coordinator; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.LinkedList; + +@AllArgsConstructor +@NoArgsConstructor +@Getter +@Setter +public class TerminatedTransactionMetadataEntry { + LinkedList txnMetas; +} \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java new file mode 100644 index 0000000000000..e6178e22d175e --- /dev/null +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator; + +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; + +import java.util.concurrent.CompletableFuture; + +/** + * An interface for persist metadata of aborted txn and recover from a compacted topic. + */ +public interface TransactionMetadataPreserver { + /** + * Replay transaction metadata to initialize the terminatedTxnMetaMap. + */ + void replay(); + + /** + * Close the transaction metadata preserver. + */ + CompletableFuture closeAsync(); + + /** + * Append the transaction metadata to the system topic __terminated_txn_state. + * + * @param txnMeta + * @return + */ + void append(TxnMeta txnMeta, String clientName); + + /** + * flush the transaction metadata to the system topic __terminated_txn_state + * before the state of txn is aborted. + * @param clientName the client name of the transaction metadata need to be flushed. + */ + void flush(String clientName) throws CoordinatorException.PreserverClosedException; + + /** + * check if transaction metadata preserver is enabled. + * @return + */ + boolean enabled(); + + /** + * Get the aborted transaction metadata of the given transaction id. + * Committed transaction metadata will not be persisted. + * + * @param txnID + * @return + */ + TxnMeta getTxnMeta(TxnID txnID, String clientName); + + /** + * Expire the transaction metadata periodically. + */ + void expireTransactionMetadata(); + + /** + * Get the interval of expiring the transaction metadata in MS. + * @return the interval of expiring the transaction metadata in MS. + */ + long getExpireOldTransactionMetadataIntervalMS(); +} diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java index 44f225b9448fd..fd7c866059270 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TxnMeta.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.transaction.coordinator; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.annotations.Beta; import java.util.List; import org.apache.pulsar.client.api.transaction.TxnID; @@ -28,6 +29,7 @@ * An interface represents the metadata of a transaction in {@link TransactionMetadataStore}. */ @Beta +@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS) public interface TxnMeta { /** @@ -81,6 +83,21 @@ TxnMeta addProducedPartitions(List partitions) TxnMeta addAckedPartitions(List subscriptions) throws InvalidTxnStatusException; + /** + * Clear the list of produced partitions when txnMeta is stored in Preserver. + * @return + * @throws InvalidTxnStatusException + */ + TxnMeta clearProducedPartitions(); + + /** + * Clear the list of acked partitions when txnMeta is stored in Preserver. + * @return + * @throws InvalidTxnStatusException + */ + TxnMeta clearAckedPartitions(); + + /** * Update the transaction stats from the newStatus only when * the current status is the expected expectedStatus. @@ -102,7 +119,9 @@ TxnMeta updateTxnStatus(TxnStatus newStatus, long getOpenTimestamp(); /** - * Return the transaction timeout at. + * Return the transaction timeout. + * WARNING: timeoutAt is not the time point when the transaction will time out, + * but the duration of the transaction timeout. * * @return transaction timeout at. */ @@ -114,4 +133,10 @@ TxnMeta updateTxnStatus(TxnStatus newStatus, * @return transaction's owner. */ String getOwner(); + + /** + * Return the transaction client name. + * @return the transaction client name. + */ + String getClientName(); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java new file mode 100644 index 0000000000000..59363302016ea --- /dev/null +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.transaction.coordinator.TerminatedTransactionMetadataEntry; +import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataPreserver; +import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; + +import static org.apache.commons.lang3.StringUtils.isBlank; + +/** + * A transaction metadata preserver implementation. + * This class is used to persist the transaction metadata of terminated txn to a compacted topic. + * As all methods are executed in single thread poll executor, we do not need to consider the + * concurrent problem. + */ +public class MLTransactionMetadataPreserverImpl implements TransactionMetadataPreserver{ + private static final Logger log = LoggerFactory.getLogger(MLTransactionMetadataPreserverImpl.class); + public static final String TRANSACTION_METADATA_PERSIST_TOPIC_PREFIX = "__terminated_txn_state_"; + + private final int transactionMetaPersistCount; + private final long transactionMetaPersistTimeInMS; + private final long transactionMetaExpireCheckIntervalInMS; + private final long tcID; + private final Producer producer; + private final Reader reader; + // key is the client name, value is the list of transaction metadata of terminated txn. + public final Map> terminatedTxnMetaList = new HashMap<>(); + // key is the client name, value is the map mapping transaction id of terminated txn to txnMeta. + public final Map> terminatedTxnMetaMap = new HashMap<>(); + + public final Set needToFlush = new HashSet<>(); + + /** + * do not enable terminated transaction metadata persist + */ + public MLTransactionMetadataPreserverImpl() { + this.transactionMetaPersistCount = 0; + this.transactionMetaPersistTimeInMS = 0; + this.transactionMetaExpireCheckIntervalInMS = 0; + this.tcID = -1; + this.producer = null; + this.reader = null; + } + + // enable terminated transaction metadata persist + public MLTransactionMetadataPreserverImpl(TransactionCoordinatorID tcID, + int transactionMetaPersistCount, + long transactionMetaPersistTimeInHour, + long transactionMetadataExpireIntervalInSecond, + PulsarClient pulsarClient) { + if (transactionMetaPersistCount <= 0 || pulsarClient == null + || pulsarClient.isClosed() || tcID == null) { + // do not enable terminated transaction metadata persist + log.info("Transaction metadata preserver init failed, transaction metadata persist count is {}, " + + "pulsar client is null or closed, or transaction coordinator id is null.", transactionMetaPersistCount); + this.transactionMetaPersistCount = 0; + this.transactionMetaPersistTimeInMS = 0; + this.transactionMetaExpireCheckIntervalInMS = 0; + this.tcID = -1; + this.producer = null; + this.reader = null; + return; + } + + this.tcID = tcID.getId(); + this.transactionMetaPersistCount = transactionMetaPersistCount; + this.transactionMetaExpireCheckIntervalInMS = transactionMetadataExpireIntervalInSecond * 1000; + this.transactionMetaPersistTimeInMS = transactionMetaPersistTimeInHour * 60 * 60 * 1000; + String topicName = getTransactionMetadataPersistTopicName(tcID); + this.producer = pulsarClient.newProducer(Schema.JSON(TerminatedTransactionMetadataEntry.class)) + .topic(topicName) + .createAsync().thenCompose(producer -> { + log.info("Create producer for transaction metadata persist topic {} successfully.", topicName); + return CompletableFuture.completedFuture(producer); + }).join(); + this.reader = pulsarClient.newReader(Schema.JSON(TerminatedTransactionMetadataEntry.class)) + .topic(topicName) + .startMessageId(MessageId.earliest) + .readCompacted(true) + .createAsync().thenCompose(reader -> { + log.info("Create reader for transaction metadata persist topic {} successfully.", topicName); + return CompletableFuture.completedFuture(reader); + }).join(); + log.info("Transaction metadata preserver init successfully, transaction coordinator id is {}, " + + "transaction metadata persist count is {}.", tcID.getId(), transactionMetaPersistCount); + } + + private static String getTransactionMetadataPersistTopicName(TransactionCoordinatorID tcID) { + return TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, + TRANSACTION_METADATA_PERSIST_TOPIC_PREFIX + tcID.getId()).toString(); + } + + /** + * Replay transaction metadata to initialize the terminatedTxnMetaMap. + */ + @Override + public void replay() { + if (!enabled()) { + log.info("Transaction metadata preserver is not enabled, do not replay transaction metadata."); + return; + } + try { + while (reader.hasMessageAvailable()) { + Message entry = reader.readNext(); + String clientName = entry.getKey(); + LinkedList txnMetaList = entry.getValue().getTxnMetas(); + terminatedTxnMetaList.put(clientName, txnMetaList); + HashMap txnMetaMap = new HashMap<>(); + terminatedTxnMetaMap.put(clientName, txnMetaMap); + for (TxnMeta txnMeta : txnMetaList) { + txnMetaMap.put(txnMeta.id(), txnMeta); + } + if (log.isDebugEnabled()) { + log.debug("Replay transaction metadata, tcID:{}, client name:{}.", tcID, clientName); + } + } + log.info("Replay transaction metadata successfully, tcID:{}.", tcID); + } catch (Exception e) { + // Though replay transaction metadata failed, the transaction coordinator can still work. + log.error("Replay transaction metadata failed, tcID:{}, reason:{}.", tcID, e); + } finally { + reader.closeAsync(); + } + } + + + /** + * Close the transaction metadata preserver. + */ + @Override + public CompletableFuture closeAsync() { + if (producer != null) { + return producer.closeAsync(); + } + return CompletableFuture.completedFuture(null); + } + + /** + * Append the transaction metadata to the system topic __terminated_txn_state. + * append method will be called after the transaction become aborting, but + * before the endTxn command is sent to TB/TP; and flush the messages before the + * transaction is terminated, the duration between append and flush method + * can improve the produce efficiency with deduplication of those messages with + * same clientName. + * + * @param txnMeta + * @return + */ + @Override + public void append(TxnMeta txnMeta, String clientName) { + if (!enabled() || clientName == null) { + return; + } + if (terminatedTxnMetaMap.containsKey(clientName)) { + List txnMetaList = terminatedTxnMetaList.get(clientName); + Map txnIDTxnMetaMap = terminatedTxnMetaMap.get(clientName); + // check duplicate + if (txnIDTxnMetaMap.containsKey(txnMeta.id())) { + return; + } else { + txnMetaList.add(txnMeta); + txnIDTxnMetaMap.put(txnMeta.id(), txnMeta); + while (txnMetaList.size() > transactionMetaPersistCount) { + txnIDTxnMetaMap.remove(txnMetaList.remove(0).id()); + } + } + } else { + LinkedList txnMetaList = new LinkedList<>(); + Map txnIDTxnMetaMap = new HashMap<>(); + txnMetaList.add(txnMeta); + txnIDTxnMetaMap.put(txnMeta.id(), txnMeta); + terminatedTxnMetaList.put(clientName, txnMetaList); + terminatedTxnMetaMap.put(clientName, txnIDTxnMetaMap); + } + if (log.isDebugEnabled()) { + log.debug("Append transaction metadata, client name:{}, transaction id:{}, tcID:{}.", + clientName, txnMeta.id(), tcID); + } + needToFlush.add(clientName); + } + + /** + * flush the transaction metadata to the system topic __terminated_txn_state + * before the state of txn is terminated. + */ + @Override + public void flush(String clientName) throws CoordinatorException.PreserverClosedException { + if (!needToFlush.contains(clientName)) { + if (log.isDebugEnabled()) { + log.debug("No need to flush transaction metadata for client name:{}, " + + "previous flush method has done it. tcID:{}.", clientName, tcID); + } + return; + } + if (log.isDebugEnabled()) { + log.debug("Flush transaction metadata, client name:{}, tcID:{}.", clientName, tcID); + } + TerminatedTransactionMetadataEntry entry = new TerminatedTransactionMetadataEntry(); + entry.setTxnMetas(terminatedTxnMetaList.get(clientName)); + try{ + producer.newMessage().key(clientName).value(entry).send(); + } catch (PulsarClientException e) { + log.error("Flush transaction metadata failed, client name:{}, tcID:{}, reason:{}.", + clientName, tcID, e); + throw new CoordinatorException.PreserverClosedException(e.getMessage()); + } + needToFlush.remove(clientName); + } + + @Override + public boolean enabled() { + return transactionMetaPersistCount > 0; + } + + @Override + public TxnMeta getTxnMeta(TxnID txnID, String clientName) { + if (!enabled() || isBlank(clientName)) { + return null; + } + // this method is not executed in single thread, so we need to ensure + // NPE exception will not be thrown. + Map txnIDTxnMetaMap = terminatedTxnMetaMap.get(clientName); + if(txnIDTxnMetaMap == null) { + return null; + } + return txnIDTxnMetaMap.get(txnID); + } + + @Override + public void expireTransactionMetadata() { + if(!enabled()) { + return; + } + if (log.isDebugEnabled()) { + log.debug("Start to check transaction metadata expire, tcID:{}.", tcID); + } + long now = System.currentTimeMillis(); + for (Map.Entry> entry : terminatedTxnMetaList.entrySet()) { + String clientName = entry.getKey(); + LinkedList txnMetaList = entry.getValue(); + Map txnIDTxnMetaMap = terminatedTxnMetaMap.get(clientName); + while (!txnMetaList.isEmpty()) { + // peek the oldest transaction metadata + TxnMeta txnMeta = txnMetaList.peek(); + if (now - txnMeta.getTimeoutAt() - txnMeta.getOpenTimestamp() + > transactionMetaPersistTimeInMS) { + txnMetaList.remove(); + txnIDTxnMetaMap.remove(txnMeta.id()); + } else { + break; + } + } + if(txnMetaList.isEmpty()) { + // delete the transaction metadata from the system topic __terminated_txn_state + // producer.newMessage().key(clientName).value(null).send(); + terminatedTxnMetaList.remove(clientName); + terminatedTxnMetaMap.remove(clientName); + } + } + } + + @Override + public long getExpireOldTransactionMetadataIntervalMS() { + return transactionMetaExpireCheckIntervalInMS; + } +} \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java index ed38305abf0f3..7cdb415ac3d19 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java @@ -23,6 +23,10 @@ import java.util.HashSet; import java.util.List; import java.util.Set; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Getter; +import lombok.Setter; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; import org.apache.pulsar.transaction.coordinator.TxnMeta; @@ -33,7 +37,12 @@ /** * A class represents the metadata of a transaction stored in * the {@link org.apache.pulsar.transaction.coordinator.TransactionMetadataStore}. +* If serialized, only txnID, txnStatus will be serialized. */ +@JsonIgnoreProperties(value = {"producedPartitions", "ackedPartitions", "clientName"}, + ignoreUnknown = true) +@Setter +@Getter class TxnMetaImpl implements TxnMeta { private final TxnID txnID; @@ -43,14 +52,35 @@ class TxnMetaImpl implements TxnMeta { private final long openTimestamp; private final long timeoutAt; private final String owner; + private final String clientName; + + // used for deserialize + public TxnMetaImpl() { + this.txnID = null; + this.openTimestamp = 0; + this.timeoutAt = 0; + this.owner = null; + this.clientName = null; + } + TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt, String owner) { this.txnID = txnID; this.openTimestamp = openTimestamp; this.timeoutAt = timeoutAt; this.owner = owner; + this.clientName = null; } + TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt,, String owner, String clientName) { + this.txnID = txnID; + this.openTimestamp = openTimestamp; + this.timeoutAt = timeoutAt; + this.owner = owner; + this.clientName = clientName; + } + + @Override public TxnID id() { return txnID; @@ -131,6 +161,18 @@ public synchronized TxnMetaImpl addAckedPartitions(List return this; } + @Override + public TxnMeta clearProducedPartitions() { + this.producedPartitions.clear(); + return this; + } + + @Override + public TxnMeta clearAckedPartitions() { + this.ackedPartitions.clear(); + return this; + } + /** * Update the transaction stats from the newStatus only when * the current status is the expected expectedStatus. @@ -153,18 +195,4 @@ public synchronized TxnMetaImpl updateTxnStatus(TxnStatus newStatus, return this; } - @Override - public long getOpenTimestamp() { - return this.openTimestamp; - } - - @Override - public long getTimeoutAt() { - return this.timeoutAt; - } - - @Override - public String getOwner() { - return this.owner; - } } From 36ef545281713d78c2b0e9aa5515e86d85220ef6 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 18 May 2023 20:22:35 +0800 Subject: [PATCH 03/13] change MLTransactionStore. --- .../coordinator/TransactionMetadataStore.java | 37 +++++ .../TransactionMetadataStoreProvider.java | 5 +- .../impl/InMemTransactionMetadataStore.java | 27 +++- ...InMemTransactionMetadataStoreProvider.java | 4 +- .../impl/MLTransactionMetadataStore.java | 139 +++++++++++++++++- .../MLTransactionMetadataStoreProvider.java | 4 +- .../coordinator/impl/TxnMetaImpl.java | 2 +- 7 files changed, 204 insertions(+), 14 deletions(-) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java index ff5adb4d409c7..270b7a1eb4119 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java @@ -52,6 +52,30 @@ default CompletableFuture getTxnStatus(TxnID txnid) { */ CompletableFuture getTxnMeta(TxnID txnid); + /** + * Query the {@link TxnMeta} of a given transaction txnID + * and clientName from TransactionMetadataPreserver. + * @param txnID + * @param clientName + * @return TxnMeta of the given transaction and clientName. + */ + TxnMeta getTxnMetaFromPreserver(TxnID txnID, String clientName); + + + /** + * check if the transaction metadata preserver is enabled. + * @return true if the transaction metadata preserver is enabled. + */ + boolean transactionMetadataPreserverEnabled(); + + /** + * append the transaction metadata to the transaction metadata preserver. + * @param txnMeta + * @param clientName + */ + CompletableFuture appendTxnMetaToPreserver(TxnMeta txnMeta, String clientName); + + /** * Create a new transaction in the transaction metadata store. * @@ -63,6 +87,19 @@ default CompletableFuture getTxnStatus(TxnID txnid) { */ CompletableFuture newTransaction(long timeoutInMills, String owner); + /** + * Create a new transaction in the transaction metadata store. + * + * @param timeoutInMills the timeout duration of the transaction in mills + * @param owner the role which is the owner of the transaction + * @param clientName the client name that creates the transaction + * @return a future represents the result of creating a new transaction. + * it returns {@link TxnID} as the identifier for identifying the + * transaction. + */ + CompletableFuture newTransaction(long timeoutInMills, String owner, String clientName); + + /** * Add the produced partitions to transaction identified by txnid. * diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java index 7145ea1214f95..49e996623dea1 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProvider.java @@ -70,6 +70,7 @@ static TransactionMetadataStoreProvider newProvider(String providerClassName) th CompletableFuture openStore( TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, - TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator, - TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, Timer timer); + TransactionRecoverTracker recoverTracker, TransactionMetadataPreserver preserver, + long maxActiveTransactionsPerCoordinator, TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, + Timer timer); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index 0f3c5e42d7a69..751cb9fa567cd 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -73,18 +73,39 @@ public CompletableFuture getTxnMeta(TxnID txnid) { return getFuture; } + @Override + public TxnMeta getTxnMetaFromPreserver(TxnID txnID, String clientName) { + return null; + } + + @Override + public boolean transactionMetadataPreserverEnabled() { + return false; + } + + @Override + public CompletableFuture appendTxnMetaToPreserver(TxnMeta txnMeta, String clientName) { + return null; + } + @Override public CompletableFuture newTransaction(long timeoutInMills, String owner) { + return newTransaction(timeoutInMills, owner, null); + } + + @Override + public CompletableFuture newTransaction(long timeoutInMills, String owner, String clientName) { if (owner != null) { if (StringUtils.isBlank(owner)) { return CompletableFuture.failedFuture(new IllegalArgumentException("Owner can't be blank")); } } + TxnID txnID = new TxnID( - tcID.getId(), - localID.getAndIncrement() + tcID.getId(), + localID.getAndIncrement() ); - TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(), timeoutInMills, owner); + TxnMetaImpl txn = new TxnMetaImpl(txnID, System.currentTimeMillis(), timeoutInMills, owner, clientName); transactions.put(txnID, txn); return CompletableFuture.completedFuture(txnID); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java index f3f5a3e432904..9651a408a0d04 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStoreProvider.java @@ -23,6 +23,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataPreserver; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; @@ -39,10 +40,11 @@ public CompletableFuture openStore(TransactionCoordina ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, TransactionRecoverTracker recoverTracker, + TransactionMetadataPreserver preserver, long maxActiveTransactionsPerCoordinator, TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, Timer timer) { return CompletableFuture.completedFuture( - new InMemTransactionMetadataStore(transactionCoordinatorId)); + new InMemTransactionMetadataStore(transactionCoordinatorId)); } } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index b6eaad2e3e38f..b8c593286323b 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -20,6 +20,9 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; +import io.netty.util.Timeout; +import io.netty.util.Timer; +import io.netty.util.TimerTask; import io.netty.util.concurrent.DefaultThreadFactory; import java.time.Duration; import java.util.ArrayList; @@ -30,6 +33,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; @@ -44,6 +48,7 @@ import org.apache.pulsar.common.util.RecoverTimeRecord; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataPreserver; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; @@ -59,6 +64,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.commons.lang3.StringUtils.isBlank; + /** * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. */ @@ -72,6 +79,8 @@ public class MLTransactionMetadataStore @VisibleForTesting final ConcurrentSkipListMap>> txnMetaMap = new ConcurrentSkipListMap<>(); private final TransactionTimeoutTracker timeoutTracker; + private final TransactionMetadataPreserver transactionMetadataPreserver; + private final Timer timer; private final TransactionMetadataStoreStats transactionMetadataStoreStats; private final LongAdder createdTransactionCount; private final LongAdder committedTransactionCount; @@ -93,6 +102,8 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID, this.tcID = tcID; this.transactionLog = mlTransactionLog; this.timeoutTracker = timeoutTracker; + this.transactionMetadataPreserver = new MLTransactionMetadataPreserverImpl(); + this.timer = null; this.transactionMetadataStoreStats = new TransactionMetadataStoreStats(); this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator; @@ -106,6 +117,52 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID, this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); } + public MLTransactionMetadataStore(TransactionCoordinatorID tcID, + MLTransactionLogImpl mlTransactionLog, + TransactionTimeoutTracker timeoutTracker, + TransactionMetadataPreserver transactionMetadataPreserver, + Timer timer, + MLTransactionSequenceIdGenerator sequenceIdGenerator, + long maxActiveTransactionsPerCoordinator) { + super(State.None); + this.sequenceIdGenerator = sequenceIdGenerator; + this.tcID = tcID; + this.transactionLog = mlTransactionLog; + this.timeoutTracker = timeoutTracker; + this.transactionMetadataPreserver = transactionMetadataPreserver; + this.timer = timer; + this.transactionMetadataStoreStats = new TransactionMetadataStoreStats(); + + this.maxActiveTransactionsPerCoordinator = maxActiveTransactionsPerCoordinator; + this.createdTransactionCount = new LongAdder(); + this.committedTransactionCount = new LongAdder(); + this.abortedTransactionCount = new LongAdder(); + this.transactionTimeoutCount = new LongAdder(); + this.appendLogCount = new LongAdder(); + DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_" + + tcID.toString() + "_thread_factory"); + this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); + } + + // TimerTask for expire old transaction metadata in preserver + class ExpireOldTransactionMetadataInPreserverTask implements TimerTask { + @Override + public void run(Timeout timeout) throws Exception { + internalPinnedExecutor.execute(() -> { + try { + transactionMetadataPreserver.expireTransactionMetadata(); + } catch (Exception e) { + log.error("Expire old transaction metadata in preserver error", e); + } + // schedule next timeout + timer.newTimeout(ExpireOldTransactionMetadataInPreserverTask.this, + transactionMetadataPreserver.getExpireOldTransactionMetadataIntervalMS(), + TimeUnit.MILLISECONDS); + }); + } + } + + public CompletableFuture init(TransactionRecoverTracker recoverTracker) { CompletableFuture completableFuture = new CompletableFuture<>(); if (!changeToInitializingState()) { @@ -120,6 +177,11 @@ public CompletableFuture init(TransactionRecoverTracke @Override public void replayComplete() { recoverTracker.appendOpenTransactionToTimeoutTracker(); + if (transactionMetadataPreserverEnabled()) { + timer.newTimeout(new ExpireOldTransactionMetadataInPreserverTask(), + transactionMetadataPreserver.getExpireOldTransactionMetadataIntervalMS(), + TimeUnit.MILLISECONDS); + } if (!changeToReadyState()) { log.error("Managed ledger transaction metadata store change state error when replay complete"); completableFuture @@ -152,8 +214,10 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran long timeoutAt = transactionMetadataEntry.getTimeoutMs(); final String owner = transactionMetadataEntry.hasOwner() ? transactionMetadataEntry.getOwner() : null; + String clientName = transactionMetadataEntry.hasClientName() + ? transactionMetadataEntry.getClientName() : null; final TxnMetaImpl left = new TxnMetaImpl(txnID, - openTimestamp, timeoutAt, owner); + openTimestamp, timeoutAt, owner, clientName); txnMetaMap.put(transactionId, MutablePair.of(left, positions)); recoverTracker.handleOpenStatusTransaction(txnSequenceId, timeoutAt + openTimestamp); @@ -228,7 +292,30 @@ public CompletableFuture getTxnMeta(TxnID txnID) { } @Override - public CompletableFuture newTransaction(long timeOut, String owner) { + public TxnMeta getTxnMetaFromPreserver(TxnID txnID, String clientName) { + return transactionMetadataPreserver.getTxnMeta(txnID, clientName); + } + + @Override + public boolean transactionMetadataPreserverEnabled() { + return transactionMetadataPreserver.enabled(); + } + + @Override + public CompletableFuture appendTxnMetaToPreserver(TxnMeta txnMeta, String clientName) { + if(!transactionMetadataPreserver.enabled() || isBlank(clientName)) { + return CompletableFuture.completedFuture(null); + } + CompletableFuture completableFuture = new CompletableFuture<>(); + internalPinnedExecutor.execute(()->{ + transactionMetadataPreserver.append(txnMeta, clientName); + completableFuture.complete(null); + }); + return completableFuture; + } + + @Override + public CompletableFuture newTransaction(long timeOut, String owner, String clientName) { if (this.maxActiveTransactionsPerCoordinator == 0 || this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) { CompletableFuture completableFuture = new CompletableFuture<>(); @@ -258,13 +345,16 @@ public CompletableFuture newTransaction(long timeOut, String owner) { } transactionMetadataEntry.setOwner(owner); } + if(clientName != null && !StringUtils.isBlank(owner)) { + transactionMetadataEntry.setClientName(clientName); + } transactionLog.append(transactionMetadataEntry) .whenComplete((position, throwable) -> { if (throwable != null) { completableFuture.completeExceptionally(throwable); } else { appendLogCount.increment(); - TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut, owner); + TxnMeta txn = new TxnMetaImpl(txnID, currentTimeMillis, timeOut, owner, clientName); List positions = new ArrayList<>(); positions.add(position); Pair> pair = MutablePair.of(txn, positions); @@ -282,6 +372,12 @@ public CompletableFuture newTransaction(long timeOut, String owner) { } } + @Override + public CompletableFuture newTransaction(long timeoutInMills, String owner) { + return newTransaction(timeoutInMills, owner, null); + } + + @Override public CompletableFuture addProducedPartitionToTxn(TxnID txnID, List partitions) { CompletableFuture promise = new CompletableFuture<>(); @@ -382,10 +478,31 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, return; } getTxnPositionPair(txnID).thenCompose(txnMetaListPair -> { - if (txnMetaListPair.getLeft().status() == newStatus) { + TxnMeta txnMeta = txnMetaListPair.getLeft(); + if (txnMeta.status() == newStatus) { promise.complete(null); return promise; } + if (transactionMetadataPreserver.enabled() && !isBlank(txnMeta.getClientName()) + && (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED)) { + // before changed to committed/aborted status, flush the metadata to __terminated_txn_state. + // so the state of transaction persisted in __terminated_txn_state may be committing instead + // of committed, we should regard committing as committed when catch TransactionNotFound exception. + // As the endTxn command in TB has been executed successfully, the partition metadata + // is useless, we will remove it to save memory and reduce message size. + txnMeta.clearProducedPartitions(); + txnMeta.clearAckedPartitions(); + if(log.isDebugEnabled()) { + log.debug("flush txnId:{} metadata before update status to {}", + txnID, newStatus.name()); + } + try { + transactionMetadataPreserver.flush(txnMeta.getClientName()); + } catch (CoordinatorException.PreserverClosedException e) { + promise.completeExceptionally(e); + return promise; + } + } TransactionMetadataEntry transactionMetadataEntry = new TransactionMetadataEntry() .setTxnidMostBits(txnID.getMostSigBits()) .setTxnidLeastBits(txnID.getLeastSigBits()) @@ -399,14 +516,23 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, .thenAccept(position -> { appendLogCount.increment(); try { - synchronized (txnMetaListPair.getLeft()) { - txnMetaListPair.getLeft().updateTxnStatus(newStatus, expectedStatus); + synchronized (txnMeta) { + if (txnMeta.status() == newStatus) { + transactionLog.deletePosition(Collections.singletonList(position)); + promise.complete(null); + return; + } + txnMeta.updateTxnStatus(newStatus, expectedStatus); txnMetaListPair.getRight().add(position); } if (newStatus == TxnStatus.ABORTING && isTimeout) { this.transactionTimeoutCount.increment(); } if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { + if(log.isDebugEnabled()) { + log.debug("TxnID : " + txnMeta.id().toString() + + " update txn status to " + newStatus.name()); + } this.transactionMetadataStoreStats .addTransactionExecutionLatencySample(System.currentTimeMillis() - txnMetaListPair.getLeft().getOpenTimestamp()); @@ -489,6 +615,7 @@ public CompletableFuture closeAsync() { new IllegalStateException( "Managed ledger transaction metadata store state to close error!")); } + transactionMetadataPreserver.closeAsync(); // Shutdown the ExecutorService MoreExecutors.shutdownAndAwaitTermination(internalPinnedExecutor, Duration.ofSeconds(5L)); return CompletableFuture.completedFuture(null); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java index eb12a3dfb77e4..bb75fcd8fbf81 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStoreProvider.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataPreserver; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; @@ -66,6 +67,7 @@ public CompletableFuture openStore(TransactionCoordina ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker, TransactionRecoverTracker recoverTracker, + TransactionMetadataPreserver preserver, long maxActiveTransactionsPerCoordinator, TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, Timer timer) { @@ -76,7 +78,7 @@ public CompletableFuture openStore(TransactionCoordina // MLTransactionLogInterceptor will init sequenceId and update the sequenceId to managedLedger properties. return txnLog.initialize().thenCompose(__ -> - new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, + new MLTransactionMetadataStore(transactionCoordinatorId, txnLog, timeoutTracker, preserver, timer, mlTransactionSequenceIdGenerator, maxActiveTransactionsPerCoordinator).init(recoverTracker)); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java index 7cdb415ac3d19..ef73b7cac69f5 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java @@ -72,7 +72,7 @@ public TxnMetaImpl() { this.clientName = null; } - TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt,, String owner, String clientName) { + TxnMetaImpl(TxnID txnID, long openTimestamp, long timeoutAt, String owner, String clientName) { this.txnID = txnID; this.openTimestamp = openTimestamp; this.timeoutAt = timeoutAt; From a2bb16f6f188440bba40058caaeb3317d46a4855 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 May 2023 10:44:35 +0800 Subject: [PATCH 04/13] add support in client. --- .../TransactionMetadataStoreService.java | 100 ++++++++++++++++-- .../pulsar/broker/service/ServerCnx.java | 26 ++++- .../pulsar/client/api/ClientBuilder.java | 8 ++ .../pulsar/client/api/transaction/TxnID.java | 20 ++-- .../pulsar/client/impl/ClientBuilderImpl.java | 9 ++ .../impl/TransactionMetaStoreHandler.java | 12 ++- .../impl/conf/ClientConfigurationData.java | 7 ++ .../TransactionCoordinatorClientImpl.java | 6 +- .../impl/transaction/TransactionImpl.java | 6 ++ .../pulsar/common/protocol/Commands.java | 17 ++- 10 files changed, 175 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 3e3b044ec51b8..ffa6b311d2e5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker; +import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.getMLTransactionLogName; import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.ABORTING; import static org.apache.pulsar.transaction.coordinator.proto.TxnStatus.COMMITTING; @@ -57,6 +58,7 @@ import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataPreserver; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker; @@ -64,9 +66,11 @@ import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker; import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory; import org.apache.pulsar.transaction.coordinator.TxnMeta; +import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataPreserverImpl; import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; import org.apache.pulsar.transaction.coordinator.proto.TxnStatus; import org.slf4j.Logger; @@ -137,7 +141,19 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc TransactionRecoverTracker recoverTracker = new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this, timeoutTracker, tcId.getId()); - openTransactionMetadataStore(tcId, timeoutTracker, recoverTracker).thenAccept( + TransactionMetadataPreserver preserver; + try { + preserver = new MLTransactionMetadataPreserverImpl(tcId, + pulsarService.getConfiguration().getTransactionMetaPersistCount(), + pulsarService.getConfiguration().getTransactionMetaPersistTimeInHour(), + pulsarService.getConfiguration().getTransactionMetaExpireCheckIntervalInSecond(), + pulsarService.getClient()); + preserver.replay(); + } catch (Throwable e) { + LOG.error("Failed to create transaction metadata preserver for tcId {}, reason:{}", tcId, e); + preserver = new MLTransactionMetadataPreserverImpl(); + } + openTransactionMetadataStore(tcId, preserver, timeoutTracker, recoverTracker).thenAccept( store -> internalPinnedExecutor.execute(() -> { // TransactionMetadataStore initialization // need to use TransactionMetadataStore itself. @@ -214,6 +230,7 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc public CompletableFuture openTransactionMetadataStore(TransactionCoordinatorID tcId, + TransactionMetadataPreserver preserver, TransactionTimeoutTracker timeoutTracker, TransactionRecoverTracker recoverTracker) { final Timer brokerClientSharedTimer = pulsarService.getBrokerClientSharedTimer(); @@ -228,7 +245,7 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc return pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose( v -> transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v, - timeoutTracker, recoverTracker, + timeoutTracker, recoverTracker, preserver, pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(), txnLogBufferedWriterConfig, brokerClientSharedTimer)); } @@ -257,12 +274,12 @@ public CompletableFuture removeTransactionMetadataStore(TransactionCoordin } public CompletableFuture newTransaction(TransactionCoordinatorID tcId, long timeoutInMills, - String owner) { + String owner, String clientName) { TransactionMetadataStore store = stores.get(tcId); if (store == null) { return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId)); } - return store.newTransaction(timeoutInMills, owner); + return store.newTransaction(timeoutInMills, owner, clientName); } public CompletableFuture addProducedPartitionToTxn(TxnID txnId, List partitions) { @@ -312,14 +329,31 @@ public CompletableFuture updateTxnStatus(TxnID txnId, TxnStatus newStatus, return store.updateTxnStatus(txnId, newStatus, expectedStatus, isTimeout); } + public CompletableFuture appendTxnMetaToPreserver(TxnID txnID, TxnMeta txnMeta, String clientName) { + TransactionCoordinatorID tcId = getTcIdFromTxnId(txnID); + TransactionMetadataStore store = stores.get(tcId); + if (store == null) { + return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId)); + } + return store.appendTxnMetaToPreserver(txnMeta, clientName); + } + public CompletableFuture endTransaction(TxnID txnID, int txnAction, boolean isTimeout) { CompletableFuture future = new CompletableFuture<>(); - endTransaction(txnID, txnAction, isTimeout, future); + endTransaction(txnID, txnAction, isTimeout, future, null); + return future; + } + + public CompletableFuture endTransaction(TxnID txnID, int txnAction, + boolean isTimeout, String clientName) { + CompletableFuture future = new CompletableFuture<>(); + endTransaction(txnID, txnAction, isTimeout, future, clientName); return future; } + public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, - CompletableFuture future) { + CompletableFuture future, String clientName) { TxnStatus newStatus; switch (txnAction) { case TxnAction.COMMIT_VALUE: @@ -335,20 +369,61 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, future.completeExceptionally(exception); return; } + TransactionCoordinatorID tcId = getTcIdFromTxnId(txnID); + TransactionMetadataStore store = stores.get(tcId); + if (store == null) { + future.completeExceptionally(new CoordinatorNotFoundException(tcId)); + return; + } getTxnMeta(txnID) .thenCompose(txnMeta -> { if (txnMeta.status() == TxnStatus.OPEN) { return updateTxnStatus(txnID, newStatus, TxnStatus.OPEN, isTimeout) + .thenCompose(__ -> appendTxnMetaToPreserver(txnID, txnMeta, clientName)) .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction)); + } else if (txnMeta.status() == TxnStatus.COMMITTED + && txnAction == TxnAction.COMMIT_VALUE) { + future.complete(null); + return future; + } else if (txnMeta.status() == TxnStatus.ABORTED + && txnAction == TxnAction.ABORT_VALUE) { + future.complete(null); + return future; } + return fakeAsyncCheckTxnStatus(txnMeta.status(), txnAction, txnID, newStatus) + .thenCompose(__ -> appendTxnMetaToPreserver(txnID, txnMeta, clientName)) .thenCompose(__ -> endTxnInTransactionBuffer(txnID, txnAction)); - }).whenComplete((__, ex)-> { + }).whenComplete((__, ex) -> { if (ex == null) { future.complete(null); return; } if (!isRetryableException(ex)) { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof CoordinatorException.TransactionNotFoundException + && !isBlank(clientName) && store.transactionMetadataPreserverEnabled()) { + TxnMeta txnMeta = store.getTxnMetaFromPreserver(txnID, clientName); + if (txnAction == TxnAction.COMMIT_VALUE && txnMeta != null + && (txnMeta.status() == TxnStatus.COMMITTED + || txnMeta.status() == TxnStatus.COMMITTING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to commit a transaction that is already committed. " + + "TxnId : {}, clientName:{}.", txnID, clientName); + } + future.complete(null); + return; + } else if (txnAction == TxnAction.ABORT_VALUE && txnMeta != null + && (txnMeta.status() == TxnStatus.ABORTED + || txnMeta.status() == TxnStatus.ABORTING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to abort a transaction that is already aborted. " + + "TxnId : {}, clientName:{}.", txnID, clientName); + } + future.complete(null); + return; + } + } LOG.error("End transaction fail! TxnId : {}, " + "TxnAction : {}", txnID, txnAction, ex); future.completeExceptionally(ex); @@ -359,7 +434,7 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, + "TxnAction : {}", txnID, txnAction, ex); } transactionOpRetryTimer.newTimeout(timeout -> - endTransaction(txnID, txnAction, isTimeout, future), + endTransaction(txnID, txnAction, isTimeout, future, clientName), endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS); }); } @@ -395,15 +470,20 @@ public void handleOpFail(Throwable e, TransactionCoordinatorID tcId) { } public void endTransactionForTimeout(TxnID txnID) { + if (LOG.isDebugEnabled()) { + LOG.debug("Transaction timeout! TxnId : {}", txnID); + } + final String[] clientName = {null}; getTxnMeta(txnID).thenCompose(txnMeta -> { + clientName[0] = txnMeta.getClientName(); if (txnMeta.status() == TxnStatus.OPEN) { - return endTransaction(txnID, TxnAction.ABORT_VALUE, true); + return endTransaction(txnID, TxnAction.ABORT_VALUE, true, clientName[0]); } else { return null; } }).exceptionally(e -> { if (isRetryableException(e)) { - endTransaction(txnID, TxnAction.ABORT_VALUE, true); + endTransaction(txnID, TxnAction.ABORT_VALUE, true, clientName[0]); } else { if (LOG.isDebugEnabled()) { LOG.debug("Transaction have been handle complete, " diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 888668e15b167..51e01ffc61ce1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2443,6 +2443,12 @@ private Throwable handleTxnException(Throwable ex, String op, long requestId) { return new CoordinatorException.CoordinatorNotFoundException(cause.getMessage()); } + if (cause instanceof CoordinatorException.PreserverClosedException) { + if (log.isDebugEnabled()) { + log.debug("The transaction metadata preserver was closed for the request {}", op); + } + return cause; + } log.error("Send response error for {} request {}.", op, requestId, cause); return cause; } @@ -2451,6 +2457,12 @@ protected void handleNewTxn(CommandNewTxn command) { checkArgument(state == State.Connected); final long requestId = command.getRequestId(); final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId()); + final String clientName; + if(command.hasClientName()) { + clientName = command.getClientName(); + } else { + clientName = null; + } if (log.isDebugEnabled()) { log.debug("Receive new txn request {} to transaction meta store {} from {}.", requestId, tcId, remoteAddress); @@ -2463,7 +2475,7 @@ protected void handleNewTxn(CommandNewTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); final String owner = getPrincipal(); - transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds(), owner) + transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds(), owner, clientName) .whenComplete(((txnID, ex) -> { if (ex == null) { if (log.isDebugEnabled()) { @@ -2559,8 +2571,18 @@ protected void handleEndTxn(CommandEndTxn command) { checkArgument(state == State.Connected); final long requestId = command.getRequestId(); final int txnAction = command.getTxnAction().getValue(); + final String clientName; + if(command.hasClientName()) { + clientName = command.getClientName(); + } else { + clientName = null; + } TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits()); final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits()); + if (log.isDebugEnabled()) { + log.debug("Receive end txn request {} to transaction meta store {} for txnId:{}.", + requestId, tcId, txnID); + } if (!checkTransactionEnableAndSendError(requestId)) { return; @@ -2574,7 +2596,7 @@ protected void handleEndTxn(CommandEndTxn command) { if (!isOwner) { return failedFutureTxnNotOwned(txnID); } - return transactionMetadataStoreService.endTransaction(txnID, txnAction, false); + return transactionMetadataStoreService.endTransaction(txnID, txnAction, false, clientName); }) .whenComplete((v, ex) -> { if (ex == null) { diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 8b959690a0363..f6eca8d43e02b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -126,6 +126,14 @@ public interface ClientBuilder extends Serializable, Cloneable { */ ClientBuilder listenerName(String name); + /** + * Configure the client name which is globally unique. + * used to save transaction metadata. + * @param name + * @return + */ + ClientBuilder clientName(String name); + /** * Release the connection if it is not used for more than {@param connectionMaxIdleSeconds} seconds. * @return the client builder instance diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java index 772c709bfc07c..ba859ecd1b791 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java @@ -21,8 +21,10 @@ import java.io.Serializable; import java.util.Objects; import lombok.AccessLevel; +import lombok.AllArgsConstructor; import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -32,6 +34,8 @@ @InterfaceAudience.Public @InterfaceStability.Evolving @Data +@AllArgsConstructor +@NoArgsConstructor public class TxnID implements Serializable { private static final long serialVersionUID = 0L; @@ -41,36 +45,28 @@ public class TxnID implements Serializable { * * @serial */ - private final long mostSigBits; + private long mostSigBits; /* * The least significant 64 bits of this TxnID. * * @serial */ - private final long leastSigBits; - - @Getter(AccessLevel.NONE) - private final transient int hashCode; - - @Getter(AccessLevel.NONE) - private final transient String txnStr; + private long leastSigBits; public TxnID(long mostSigBits, long leastSigBits) { this.mostSigBits = mostSigBits; this.leastSigBits = leastSigBits; - this.hashCode = Objects.hash(mostSigBits, leastSigBits); - this.txnStr = "(" + mostSigBits + "," + leastSigBits + ")"; } @Override public String toString() { - return txnStr; + return "(" + mostSigBits + "," + leastSigBits + ")"; } @Override public int hashCode() { - return hashCode; + return Objects.hash(mostSigBits, leastSigBits); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 7677045f0899b..6d165528da3f7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -113,6 +113,15 @@ public ClientBuilder connectionMaxIdleSeconds(int connectionMaxIdleSeconds) { return this; } + @Override + public ClientBuilder clientName(String name) { + if (StringUtils.isBlank(name)) { + throw new IllegalArgumentException("Param clientName must not be blank."); + } + conf.setClientName(name); + return this; + } + @Override public ClientBuilder authentication(Authentication authentication) { conf.setAuthentication(authentication); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java index 601fa2b8f815a..359aec2644be5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TransactionMetaStoreHandler.java @@ -195,7 +195,7 @@ private void failPendingRequest() { this.pendingRequests.clear(); } - public CompletableFuture newTransactionAsync(long timeout, TimeUnit unit) { + public CompletableFuture newTransactionAsync(long timeout, TimeUnit unit, String clientName) { if (LOG.isDebugEnabled()) { LOG.debug("New transaction with timeout in ms {}", unit.toMillis(timeout)); } @@ -204,7 +204,7 @@ public CompletableFuture newTransactionAsync(long timeout, TimeUnit unit) return callback; } long requestId = client.newRequestId(); - ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout)); + ByteBuf cmd = Commands.newTxn(transactionCoordinatorId, requestId, unit.toMillis(timeout), clientName); OpForTxnIdCallBack op = OpForTxnIdCallBack.create(cmd, callback, client); internalPinnedExecutor.execute(() -> { pendingRequests.put(requestId, op); @@ -442,7 +442,7 @@ public void handleAddSubscriptionToTxnResponse(CommandAddSubscriptionToTxnRespon }); } - public CompletableFuture endTxnAsync(TxnID txnID, TxnAction action) { + public CompletableFuture endTxnAsync(TxnID txnID, TxnAction action, String clientName) { if (LOG.isDebugEnabled()) { LOG.debug("End txn {}, action {}", txnID, action); } @@ -451,7 +451,8 @@ public CompletableFuture endTxnAsync(TxnID txnID, TxnAction action) { return callback; } long requestId = client.newRequestId(); - BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), action); + BaseCommand cmd = Commands.newEndTxn(requestId, txnID.getLeastSigBits(), + txnID.getMostSigBits(), action, clientName); ByteBuf buf = Commands.serializeWithSize(cmd); OpForVoidCallBack op = OpForVoidCallBack.create(buf, callback, client); internalPinnedExecutor.execute(() -> { @@ -525,7 +526,8 @@ void handleEndTxnResponse(CommandEndTxnResponse response) { private boolean checkIfNeedRetryByError(ServerError error, String message, OpBase op) { - if (error == ServerError.TransactionCoordinatorNotFound) { + if (error == ServerError.TransactionCoordinatorNotFound + || error == ServerError.TransactionPreserverClosed) { if (getState() != State.Connecting) { connectionHandler.reconnectLater(new TransactionCoordinatorClientException .CoordinatorNotFoundException(message)); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 7d94675ccba7d..92e85bd132b14 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -257,6 +257,13 @@ public class ClientConfigurationData implements Serializable, Cloneable { ) private String listenerName; + @ApiModelProperty( + name = "clientName", + value = "Client name that is used to save transaction metadata." + ) + private String clientName; + + @ApiModelProperty( name = "useKeyStoreTls", value = "Set TLS using KeyStore way." diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java index 9e79fc203c225..04a5831700c4e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionCoordinatorClientImpl.java @@ -173,7 +173,7 @@ public TxnID newTransaction(long timeout, TimeUnit unit) throws TransactionCoord @Override public CompletableFuture newTransactionAsync(long timeout, TimeUnit unit) { - return nextHandler().newTransactionAsync(timeout, unit); + return nextHandler().newTransactionAsync(timeout, unit, pulsarClient.getConfiguration().getClientName()); } @Override @@ -238,7 +238,7 @@ public CompletableFuture commitAsync(TxnID txnID) { new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException( txnID.getMostSigBits())); } - return handler.endTxnAsync(txnID, TxnAction.COMMIT); + return handler.endTxnAsync(txnID, TxnAction.COMMIT, pulsarClient.getConfiguration().getClientName()); } @Override @@ -258,7 +258,7 @@ public CompletableFuture abortAsync(TxnID txnID) { new TransactionCoordinatorClientException.MetaStoreHandlerNotExistsException( txnID.getMostSigBits())); } - return handler.endTxnAsync(txnID, TxnAction.ABORT); + return handler.endTxnAsync(txnID, TxnAction.ABORT, pulsarClient.getConfiguration().getClientName()); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index d1260ba045e6d..08548caed2c32 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -187,6 +187,9 @@ public void registerAckOp(CompletableFuture newAckFuture) { @Override public CompletableFuture commit() { timeout.cancel(); + if (state == State.COMMITTED) { + return CompletableFuture.completedFuture(null); + } return checkState(State.OPEN, State.COMMITTING).thenCompose((value) -> { CompletableFuture commitFuture = new CompletableFuture<>(); this.state = State.COMMITTING; @@ -218,6 +221,9 @@ public CompletableFuture commit() { @Override public CompletableFuture abort() { timeout.cancel(); + if (state == State.ABORTED) { + return CompletableFuture.completedFuture(null); + } return checkState(State.OPEN, State.ABORTING).thenCompose(__ -> internalAbort()); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index cf0cd820a6d10..1e99502301768 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -62,6 +62,7 @@ import org.apache.pulsar.common.api.proto.CommandAuthChallenge; import org.apache.pulsar.common.api.proto.CommandConnect; import org.apache.pulsar.common.api.proto.CommandConnected; +import org.apache.pulsar.common.api.proto.CommandEndTxn; import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse; import org.apache.pulsar.common.api.proto.CommandEndTxnResponse; @@ -75,6 +76,7 @@ import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse; import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse.LookupType; import org.apache.pulsar.common.api.proto.CommandMessage; +import org.apache.pulsar.common.api.proto.CommandNewTxn; import org.apache.pulsar.common.api.proto.CommandNewTxnResponse; import org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse; import org.apache.pulsar.common.api.proto.CommandProducer; @@ -1330,12 +1332,15 @@ public static ByteBuf newGetOrCreateSchemaResponseError(long requestId, ServerEr // ---- transaction related ---- - public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) { + public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds, String clientName) { BaseCommand cmd = localCmd(Type.NEW_TXN); - cmd.setNewTxn() + CommandNewTxn commandNewTxn = cmd.setNewTxn() .setTcId(tcId) .setRequestId(requestId) .setTxnTtlSeconds(ttlSeconds); + if(clientName != null) { + commandNewTxn.setClientName(clientName); + } return serializeWithSize(cmd); } @@ -1434,12 +1439,16 @@ public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnId return serializeWithSize(cmd); } - public static BaseCommand newEndTxn(long requestId, long txnIdLeastBits, long txnIdMostBits, TxnAction txnAction) { + public static BaseCommand newEndTxn(long requestId, long txnIdLeastBits, long txnIdMostBits, + TxnAction txnAction, String clientName) { BaseCommand cmd = localCmd(Type.END_TXN); - cmd.setEndTxn() + CommandEndTxn commandEndTxn = cmd.setEndTxn() .setRequestId(requestId) .setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits) .setTxnAction(txnAction); + if(clientName != null) { + commandEndTxn.setClientName(clientName); + } return cmd; } From ceff5622ff6b971d672318011f18771b2eab89ad Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 May 2023 10:54:23 +0800 Subject: [PATCH 05/13] add test code. --- .../broker/service/BrokerServiceTest.java | 1 + .../TransactionMetadataStoreServiceTest.java | 110 ++++++++++++++++++ .../broker/transaction/TransactionTest.java | 69 +++++++++++ .../impl/TransactionClientConnectTest.java | 4 +- .../TransactionMetadataStoreProviderTest.java | 2 +- 5 files changed, 183 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 90b00bacaaa88..b887d55f59d8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -1460,6 +1460,7 @@ public void testIsSystemTopic() { assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot"))); assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-0"))); assertTrue(brokerService.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-1"))); + assertTrue(brokerService.isSystemTopic(TopicName.get("persistent://pulsar/system/__terminated_txn_state_0"))); assertTrue(brokerService.isSystemTopic(TopicName .get("topicxxx-partition-0-multiTopicsReader-f433329d68__transaction_pending_ack"))); assertTrue(brokerService.isSystemTopic( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index 5cd3ed9f90454..f3b140ed482fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -116,6 +116,116 @@ public void testNewTransaction() throws Exception { Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0); } + @Test + public void testCommitAndAbortTerminatedTransactionWithPreserverEnabled() throws Exception { + TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); + pulsar.getConfiguration().setTransactionMetaPersistCount(2); + transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0)); + Awaitility.await().until(() -> + transactionMetadataStoreService.getStores().size() == 1); + TransactionMetadataStore transactionMetadataStore=transactionMetadataStoreService.getStores().get(TransactionCoordinatorID.get(0)); + checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() + .getStores().get(TransactionCoordinatorID.get(0))); + TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, "txnClient").get(); + Assert.assertEquals(txnID0.getMostSigBits(), 0); + transactionMetadataStoreService.endTransaction(txnID0, TxnAction.COMMIT_VALUE, false, "txnClient").get(); + transactionMetadataStore.getTxnMeta(txnID0).handle((txnMeta, throwable) -> { + Assert.assertNotNull(throwable); + Assert.assertTrue(throwable instanceof CoordinatorException.TransactionNotFoundException); + return null; + }).get(); + // commit again. + transactionMetadataStoreService.endTransaction(txnID0, TxnAction.COMMIT_VALUE, false, "txnClient").get(); + + + TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, "txnClient").get(); + Assert.assertEquals(txnID0.getMostSigBits(), 0); + transactionMetadataStoreService.endTransaction(txnID1, TxnAction.ABORT_VALUE, false, "txnClient").get(); + transactionMetadataStore.getTxnMeta(txnID1).handle((txnMeta, throwable) -> { + Assert.assertNotNull(throwable); + Assert.assertTrue(throwable instanceof CoordinatorException.TransactionNotFoundException); + return null; + }).get(); + // abort again. + transactionMetadataStoreService.endTransaction(txnID1, TxnAction.ABORT_VALUE, false, "txnClient").get(); + + + // create and commit third transaction, which will trigger the first transaction to be removed. + TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, "txnClient").get(); + Assert.assertEquals(txnID2.getMostSigBits(), 0); + transactionMetadataStoreService.endTransaction(txnID2, TxnAction.COMMIT_VALUE, false, "txnClient").get(); + transactionMetadataStore.getTxnMeta(txnID2).handle((txnMeta, throwable) -> { + Assert.assertNotNull(throwable); + Assert.assertTrue(throwable instanceof CoordinatorException.TransactionNotFoundException); + return null; + }).get(); + // recommit the first transaction, which will be failed. + transactionMetadataStoreService.endTransaction(txnID0, TxnAction.COMMIT_VALUE, false, "txnClient") + .handle((txnMeta, throwable) -> { + Assert.assertNotNull(throwable); + Assert.assertTrue(FutureUtil.unwrapCompletionException(throwable) + instanceof CoordinatorException.TransactionNotFoundException); + return null; + }).get(); + + + // close the preserver and reopen it. + Field preserverField = transactionMetadataStore.getClass().getDeclaredField("transactionMetadataPreserver"); + preserverField.setAccessible(true); + TransactionMetadataPreserver preserver = (TransactionMetadataPreserver) preserverField.get(transactionMetadataStore); + preserver.closeAsync().get(); + preserverField.set(transactionMetadataStore, new MLTransactionMetadataPreserverImpl( + TransactionCoordinatorID.get(0l), 2, 1l, 600l, pulsarClient + )); + preserver = (TransactionMetadataPreserver) preserverField.get(transactionMetadataStore); + preserver.replay(); + Assert.assertNull(preserver.getTxnMeta(txnID0,"txnClient")); + Assert.assertNotNull(preserver.getTxnMeta(txnID1,"txnClient")); + Assert.assertNotNull(preserver.getTxnMeta(txnID2,"txnClient")); + + + // try to expire all the transactions. + Field transactionMetaExpireCheckIntervalInMSField = preserver.getClass().getDeclaredField("transactionMetaPersistTimeInMS"); + transactionMetaExpireCheckIntervalInMSField.setAccessible(true); + transactionMetaExpireCheckIntervalInMSField.set(preserver, Long.MIN_VALUE); + Method expireMethod = preserver.getClass().getDeclaredMethod("expireTransactionMetadata"); + expireMethod.invoke(preserver); + transactionMetadataStoreService.endTransaction(txnID2, TxnAction.COMMIT_VALUE, false, "txnClient") + .handle((txnMeta, throwable) -> { + Assert.assertNotNull(throwable); + Assert.assertTrue(FutureUtil.unwrapCompletionException(throwable) + instanceof CoordinatorException.TransactionNotFoundException); + return null; + }).get(); + } + + @Test + public void testCommitAndAbortTerminatedTransactionWithPreserverClosed() throws Exception { + TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); + pulsar.getConfiguration().setTransactionMetaPersistCount(10); + transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0)); + Awaitility.await().until(() -> + transactionMetadataStoreService.getStores().size() == 1); + TransactionMetadataStore transactionMetadataStore=transactionMetadataStoreService.getStores().get(TransactionCoordinatorID.get(0)); + checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() + .getStores().get(TransactionCoordinatorID.get(0))); + Field preserverField = transactionMetadataStore.getClass().getDeclaredField("transactionMetadataPreserver"); + preserverField.setAccessible(true); + TransactionMetadataPreserver preserver = (TransactionMetadataPreserver) preserverField.get(transactionMetadataStore); + preserver.closeAsync().get(); + TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, "txnClient").get(); + Assert.assertEquals(txnID0.getMostSigBits(), 0); + transactionMetadataStoreService.endTransaction(txnID0, TxnAction.COMMIT_VALUE, false, "txnClient") + .handle((txnMeta, throwable) -> { + Assert.assertNotNull(throwable); + Assert.assertTrue(FutureUtil.unwrapCompletionException(throwable) + instanceof CoordinatorException.PreserverClosedException); + return null; + }).get(); + } + + + @Test public void testAddProducedPartitionToTxn() throws Exception { TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index c4ec2ec766e32..2757fc21ce93d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -951,6 +951,75 @@ public void testEndTxnWhenCommittingOrAborting() throws Exception { Awaitility.await().untilAsserted(() -> assertEquals(listener.getCommittedTxnCount(),1)); } + @Test + public void testConcurrentCommit() throws Exception { + String topic = NAMESPACE1 + "/testConcurrentCommit"; + @Cleanup + Producer producer = pulsarClient + .newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(true) + // ensure that batch message is sent + .batchingMaxPublishDelay(3, TimeUnit.SECONDS) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.MINUTES).build().get(); + + // send batch message, the size is 5 + for (int i = 0; i < 5; i++) { + producer.newMessage(txn).value(("test" + i).getBytes()); + } + producer.flush(); + + CompletableFuture future = new CompletableFuture(); + // try to commit many times. + while (!txn.getState().equals(Transaction.State.COMMITTED)) { + txn.commit().exceptionally(e -> { + future.completeExceptionally(e); + return null; + }); + } + assertTrue(!future.isCompletedExceptionally()); + + txn.commit().get(); + } + + @Test + public void testConcurrentAbort() throws Exception { + String topic = NAMESPACE1 + "/testConcurrentCommit"; + @Cleanup + Producer producer = pulsarClient + .newProducer(Schema.BYTES) + .topic(topic) + .enableBatching(true) + // ensure that batch message is sent + .batchingMaxPublishDelay(3, TimeUnit.SECONDS) + .sendTimeout(0, TimeUnit.SECONDS) + .create(); + Transaction txn = pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.MINUTES).build().get(); + + // send batch message, the size is 5 + for (int i = 0; i < 5; i++) { + producer.newMessage(txn).value(("test" + i).getBytes()); + } + producer.flush(); + + CompletableFuture future = new CompletableFuture(); + // try to abort many times. + while (!txn.getState().equals(Transaction.State.ABORTED)) { + txn.abort().exceptionally(e -> { + future.completeExceptionally(e); + return null; + }); + } + assertTrue(!future.isCompletedExceptionally()); + + txn.abort().get(); + } + + @Test public void testNoEntryCanBeReadWhenRecovery() throws Exception { String topic = NAMESPACE1 + "/test"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java index 34e2362431dba..d8de09e5d7a0e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionClientConnectTest.java @@ -143,7 +143,7 @@ public void testPulsarClientCloseThenCloseTcClient() throws Exception { (TransactionMetaStoreHandler[]) field.get(transactionCoordinatorClient); for (TransactionMetaStoreHandler handler : handlers) { - handler.newTransactionAsync(10, TimeUnit.SECONDS).get(); + handler.newTransactionAsync(10, TimeUnit.SECONDS, null).get(); } for (TransactionMetaStoreHandler handler : handlers) { Field stateField = HandlerState.class.getDeclaredField("state"); @@ -155,7 +155,7 @@ public void testPulsarClientCloseThenCloseTcClient() throws Exception { method.setAccessible(true); assertEquals(method.invoke(handler).toString(), "Closed"); try { - handler.newTransactionAsync(10, TimeUnit.SECONDS).get(); + handler.newTransactionAsync(10, TimeUnit.SECONDS, null).get(); } catch (ExecutionException | InterruptedException e) { assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.MetaStoreHandlerNotReadyException); diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java index 04b2d2fe6505d..7295a388e8943 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStoreProviderTest.java @@ -70,7 +70,7 @@ public TransactionMetadataStoreProviderTest(String providerClassName) throws Exc public void setup() throws Exception { this.tcId = new TransactionCoordinatorID(1L); this.store = this.provider.openStore(tcId, null, null, - null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl(), 0L, + null, new MLTransactionMetadataStoreTest.TransactionRecoverTrackerImpl(), null, 0L, new TxnLogBufferedWriterConfig(), transactionTimer).get(); } From 84f8a3430262a73605e9d0483c72737ca946a47e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 May 2023 16:30:33 +0800 Subject: [PATCH 06/13] fix. --- .../TransactionMetadataStoreService.java | 20 +++++++++++++++- .../pulsar/broker/service/ServerCnx.java | 8 +++++-- .../pulsar/broker/service/ServerCnxTest.java | 4 ++-- .../TransactionMetadataStoreServiceTest.java | 12 ++++++---- .../broker/transaction/TransactionTest.java | 24 ++++++++++++++++++- .../transaction/TransactionTestBase.java | 1 + .../pulsar/client/api/transaction/TxnID.java | 4 ---- .../impl/transaction/TransactionImpl.java | 2 +- .../TerminatedTransactionMetadataEntry.java | 20 +++++++++------- .../coordinator/TransactionMetadataStore.java | 10 ++++++++ .../impl/InMemTransactionMetadataStore.java | 5 ++++ .../impl/MLTransactionMetadataStore.java | 14 +++++++++-- 12 files changed, 99 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index ffa6b311d2e5c..241d95b7e41b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -273,6 +273,11 @@ public CompletableFuture removeTransactionMetadataStore(TransactionCoordin } } + public CompletableFuture newTransaction(TransactionCoordinatorID tcId, long timeoutInMills, + String owner) { + return newTransaction(tcId, timeoutInMills, owner, null); + } + public CompletableFuture newTransaction(TransactionCoordinatorID tcId, long timeoutInMills, String owner, String clientName) { TransactionMetadataStore store = stores.get(tcId); @@ -309,6 +314,15 @@ public CompletableFuture getTxnMeta(TxnID txnId) { return store.getTxnMeta(txnId); } + public CompletableFuture getTxnMeta(TxnID txnId, String clientName) { + TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId); + TransactionMetadataStore store = stores.get(tcId); + if (store == null) { + return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId)); + } + return store.getTxnMeta(txnId, clientName); + } + public long getLowWaterMark(TxnID txnID) { TransactionCoordinatorID tcId = getTcIdFromTxnId(txnID); TransactionMetadataStore store = stores.get(tcId); @@ -565,7 +579,11 @@ public Map getStores() { } public CompletableFuture verifyTxnOwnership(TxnID txnID, String checkOwner) { - return getTxnMeta(txnID) + return verifyTxnOwnership(txnID, checkOwner, null); + } + + public CompletableFuture verifyTxnOwnership(TxnID txnID, String checkOwner, String clientName) { + return getTxnMeta(txnID, clientName) .thenCompose(meta -> { // owner was null in the old versions or no auth enabled if (meta.getOwner() == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 51e01ffc61ce1..4c78c9a749bb0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2591,7 +2591,7 @@ protected void handleEndTxn(CommandEndTxn command) { TransactionMetadataStoreService transactionMetadataStoreService = service.pulsar().getTransactionMetadataStoreService(); - verifyTxnOwnership(txnID) + verifyTxnOwnership(txnID, clientName) .thenCompose(isOwner -> { if (!isOwner) { return failedFutureTxnNotOwned(txnID); @@ -2631,9 +2631,13 @@ private CompletableFuture isSuperUser() { } private CompletableFuture verifyTxnOwnership(TxnID txnID) { + return verifyTxnOwnership(txnID, null); + } + + private CompletableFuture verifyTxnOwnership(TxnID txnID, String clientName) { assert ctx.executor().inEventLoop(); return service.pulsar().getTransactionMetadataStoreService() - .verifyTxnOwnership(txnID, getPrincipal()) + .verifyTxnOwnership(txnID, getPrincipal(), clientName) .thenComposeAsync(isOwner -> { if (isOwner) { return CompletableFuture.completedFuture(true); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index c3bab634a42c1..36d91696f5463 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -3324,7 +3324,7 @@ public void sendEndTxnResponse() throws Exception { resetChannel(); setChannelConnected(); ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L, - TxnAction.COMMIT)); + TxnAction.COMMIT, null)); channel.writeInbound(clientCommand); CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse(); @@ -3349,7 +3349,7 @@ public void sendEndTxnResponseFailed() throws Exception { resetChannel(); setChannelConnected(); ByteBuf clientCommand = Commands.serializeWithSize(Commands.newEndTxn(89L, 1L, 12L, - TxnAction.COMMIT)); + TxnAction.COMMIT, null)); channel.writeInbound(clientCommand); CommandEndTxnResponse response = (CommandEndTxnResponse) getResponse(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index f3b140ed482fc..cc8eec4b40cc0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -40,11 +40,15 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataPreserver; +import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState; import org.apache.pulsar.transaction.coordinator.TransactionSubscription; import org.apache.pulsar.transaction.coordinator.TxnMeta; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; +import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataPreserverImpl; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; import org.apache.pulsar.transaction.coordinator.proto.TxnStatus; import org.awaitility.Awaitility; @@ -126,7 +130,7 @@ public void testCommitAndAbortTerminatedTransactionWithPreserverEnabled() throws TransactionMetadataStore transactionMetadataStore=transactionMetadataStoreService.getStores().get(TransactionCoordinatorID.get(0)); checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService() .getStores().get(TransactionCoordinatorID.get(0))); - TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, "txnClient").get(); + TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null, "txnClient").get(); Assert.assertEquals(txnID0.getMostSigBits(), 0); transactionMetadataStoreService.endTransaction(txnID0, TxnAction.COMMIT_VALUE, false, "txnClient").get(); transactionMetadataStore.getTxnMeta(txnID0).handle((txnMeta, throwable) -> { @@ -138,7 +142,7 @@ public void testCommitAndAbortTerminatedTransactionWithPreserverEnabled() throws transactionMetadataStoreService.endTransaction(txnID0, TxnAction.COMMIT_VALUE, false, "txnClient").get(); - TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, "txnClient").get(); + TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null, "txnClient").get(); Assert.assertEquals(txnID0.getMostSigBits(), 0); transactionMetadataStoreService.endTransaction(txnID1, TxnAction.ABORT_VALUE, false, "txnClient").get(); transactionMetadataStore.getTxnMeta(txnID1).handle((txnMeta, throwable) -> { @@ -151,7 +155,7 @@ public void testCommitAndAbortTerminatedTransactionWithPreserverEnabled() throws // create and commit third transaction, which will trigger the first transaction to be removed. - TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, "txnClient").get(); + TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null, "txnClient").get(); Assert.assertEquals(txnID2.getMostSigBits(), 0); transactionMetadataStoreService.endTransaction(txnID2, TxnAction.COMMIT_VALUE, false, "txnClient").get(); transactionMetadataStore.getTxnMeta(txnID2).handle((txnMeta, throwable) -> { @@ -213,7 +217,7 @@ public void testCommitAndAbortTerminatedTransactionWithPreserverClosed() throws preserverField.setAccessible(true); TransactionMetadataPreserver preserver = (TransactionMetadataPreserver) preserverField.get(transactionMetadataStore); preserver.closeAsync().get(); - TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, "txnClient").get(); + TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null, "txnClient").get(); Assert.assertEquals(txnID0.getMostSigBits(), 0); transactionMetadataStoreService.endTransaction(txnID0, TxnAction.COMMIT_VALUE, false, "txnClient") .handle((txnMeta, throwable) -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 2757fc21ce93d..984a75f88a67a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -954,6 +954,17 @@ public void testEndTxnWhenCommittingOrAborting() throws Exception { @Test public void testConcurrentCommit() throws Exception { String topic = NAMESPACE1 + "/testConcurrentCommit"; + + // configure clientName + if (pulsarClient != null) { + pulsarClient.shutdown(); + } + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .enableTransaction(true) + .clientName("txnClient") + .build(); + @Cleanup Producer producer = pulsarClient .newProducer(Schema.BYTES) @@ -988,6 +999,17 @@ public void testConcurrentCommit() throws Exception { @Test public void testConcurrentAbort() throws Exception { String topic = NAMESPACE1 + "/testConcurrentCommit"; + + // configure clientName + if (pulsarClient != null) { + pulsarClient.shutdown(); + } + pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .enableTransaction(true) + .clientName("txnClient") + .build(); + @Cleanup Producer producer = pulsarClient .newProducer(Schema.BYTES) @@ -1692,7 +1714,7 @@ public void testGetTxnState() throws Exception { .build().get(); pulsarServiceList.get(0).getTransactionMetadataStoreService() .endTransaction(transaction.getTxnID(), 0, false); - transaction.commit(); + transaction.abort(); Transaction errorTxn = transaction; Awaitility.await().until(() -> errorTxn.getState() == Transaction.State.ERROR); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java index f45eda8d21fbe..c02f0c77ab202 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTestBase.java @@ -164,6 +164,7 @@ protected void startBroker() throws Exception { conf.setBrokerDeduplicationEnabled(true); conf.setTransactionBufferSnapshotMaxTransactionCount(2); conf.setTransactionBufferSnapshotMinTimeInMillis(2000); + conf.setTransactionMetaPersistCount(5); serviceConfigurationList.add(conf); PulsarTestContext.Builder testContextBuilder = diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java index ba859ecd1b791..58bda9d61ef54 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java @@ -54,10 +54,6 @@ public class TxnID implements Serializable { */ private long leastSigBits; - public TxnID(long mostSigBits, long leastSigBits) { - this.mostSigBits = mostSigBits; - this.leastSigBits = leastSigBits; - } @Override public String toString() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index 08548caed2c32..6ae96f3516d1f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -252,7 +252,7 @@ private CompletableFuture internalAbort() { @Override public TxnID getTxnID() { - return this.txnId; + return new TxnID(txnIdMostBits, txnIdLeastBits); } @Override diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java index 3680f090b8d50..846f56211ca0f 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java @@ -19,17 +19,21 @@ package org.apache.pulsar.transaction.coordinator; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; import java.util.LinkedList; -@AllArgsConstructor -@NoArgsConstructor -@Getter -@Setter public class TerminatedTransactionMetadataEntry { LinkedList txnMetas; + + public TerminatedTransactionMetadataEntry() { + this.txnMetas = new LinkedList<>(); + } + + public LinkedList getTxnMetas() { + return new LinkedList<>(txnMetas); + } + + public void setTxnMetas(LinkedList txnMetas) { + this.txnMetas = new LinkedList<>(txnMetas); + } } \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java index 270b7a1eb4119..4e5777fc425ef 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataStore.java @@ -52,6 +52,16 @@ default CompletableFuture getTxnStatus(TxnID txnid) { */ CompletableFuture getTxnMeta(TxnID txnid); + /** + * Query the {@link TxnMeta} of a given transaction txnid. + * + * @param txnid transaction id + * @param clientName the client name that creates the transaction + * @return a future represents the result of this operation. + * it returns {@link TxnMeta} of the given transaction. + */ + CompletableFuture getTxnMeta(TxnID txnid, String clientName); + /** * Query the {@link TxnMeta} of a given transaction txnID * and clientName from TransactionMetadataPreserver. diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java index 751cb9fa567cd..c09c597e7a7b7 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/InMemTransactionMetadataStore.java @@ -63,6 +63,11 @@ class InMemTransactionMetadataStore implements TransactionMetadataStore { @Override public CompletableFuture getTxnMeta(TxnID txnid) { + return getTxnMeta(txnid, null); + } + + @Override + public CompletableFuture getTxnMeta(TxnID txnid, String clientName) { CompletableFuture getFuture = new CompletableFuture<>(); TxnMetaImpl txn = transactions.get(txnid); if (null == txn) { diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index b8c593286323b..df1f92267b285 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -65,6 +65,7 @@ import org.slf4j.LoggerFactory; import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; /** * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. @@ -280,9 +281,18 @@ public CompletableFuture getTxnStatus(TxnID txnID) { } @Override - public CompletableFuture getTxnMeta(TxnID txnID) { + public CompletableFuture getTxnMeta(TxnID txnid) { + return getTxnMeta(txnid, null); + } + + @Override + public CompletableFuture getTxnMeta(TxnID txnID, String clientName) { Pair> txnMetaListPair = txnMetaMap.get(txnID.getLeastSigBits()); CompletableFuture completableFuture = new CompletableFuture<>(); + if (txnMetaListPair == null && transactionMetadataPreserverEnabled() && + clientName != null && isNotBlank(clientName)) { + completableFuture.complete(getTxnMetaFromPreserver(txnID, clientName)); + } if (txnMetaListPair == null) { completableFuture.completeExceptionally(new TransactionNotFoundException(txnID)); } else { @@ -345,7 +355,7 @@ public CompletableFuture newTransaction(long timeOut, String owner, Strin } transactionMetadataEntry.setOwner(owner); } - if(clientName != null && !StringUtils.isBlank(owner)) { + if(clientName != null && !StringUtils.isBlank(clientName)) { transactionMetadataEntry.setClientName(clientName); } transactionLog.append(transactionMetadataEntry) From e5ec1435620f810ae1ba9a05349bce582f96a02a Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 May 2023 17:44:36 +0800 Subject: [PATCH 07/13] fix license. --- .../coordinator/TerminatedTransactionMetadataEntry.java | 3 +-- .../transaction/coordinator/TransactionMetadataPreserver.java | 2 +- .../coordinator/impl/MLTransactionMetadataPreserverImpl.java | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java index 846f56211ca0f..da9c188e795c6 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TerminatedTransactionMetadataEntry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.pulsar.transaction.coordinator; diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java index e6178e22d175e..952ae10736478 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java index 59363302016ea..d8ffa003464df 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information From c8ccf515b64b0e850599e74985896bd24d947a5e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 19 May 2023 18:28:15 +0800 Subject: [PATCH 08/13] fix checkstyle. --- .../pulsar/broker/ServiceConfiguration.java | 9 +++--- .../TransactionMetadataStoreService.java | 11 ++++--- .../pulsar/broker/service/ServerCnx.java | 4 +-- .../TransactionRecoverTrackerImpl.java | 2 +- .../pulsar/client/api/transaction/TxnID.java | 2 -- .../pulsar/common/protocol/Commands.java | 4 +-- .../TransactionMetadataPreserver.java | 3 +- .../MLTransactionMetadataPreserverImpl.java | 31 +++++++++---------- .../impl/MLTransactionMetadataStore.java | 16 +++++----- .../coordinator/impl/TxnMetaImpl.java | 3 +- 10 files changed, 41 insertions(+), 44 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 1fc9424c2af32..90a0c04ec8ca5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2997,23 +2997,24 @@ The delayed message index time step(in seconds) in per bucket snapshot segment, @FieldContext( category = CATEGORY_TRANSACTION, doc = "Max number of txnMeta of aborted transaction to persist in broker." - + "If the number of aborted transaction is greater than this value, the oldest aborted transaction will be " + + "If the number of aborted transaction is greater than this value," + + " the oldest aborted transaction will be " + "removed from the cache and persisted in the store." + "default value is 0, disable persistence of aborted transaction." ) - private int TransactionMetaPersistCount = 0; + private int transactionMetaPersistCount = 0; @FieldContext( category = CATEGORY_TRANSACTION, doc = "Time in hour to persist the transaction metadata in TransactionMetadataPreserver." ) - private long TransactionMetaPersistTimeInHour = 72; + private long transactionMetaPersistTimeInHour = 72; @FieldContext( category = CATEGORY_TRANSACTION, doc = "Interval in seconds to check the expired transaction in TransactionMetadataPreserver." ) - private long TransactionMetaExpireCheckIntervalInSecond = 300; + private long transactionMetaExpireCheckIntervalInSecond = 300; @FieldContext( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 241d95b7e41b9..ec48517fef082 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -150,7 +150,8 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc pulsarService.getClient()); preserver.replay(); } catch (Throwable e) { - LOG.error("Failed to create transaction metadata preserver for tcId {}, reason:{}", tcId, e); + LOG.error("Failed to create transaction metadata preserver for tcId {}, reason:{}", + tcId, e); preserver = new MLTransactionMetadataPreserverImpl(); } openTransactionMetadataStore(tcId, preserver, timeoutTracker, recoverTracker).thenAccept( @@ -422,8 +423,8 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, && (txnMeta.status() == TxnStatus.COMMITTED || txnMeta.status() == TxnStatus.COMMITTING)) { if (LOG.isDebugEnabled()) { - LOG.debug("try to commit a transaction that is already committed. " + - "TxnId : {}, clientName:{}.", txnID, clientName); + LOG.debug("try to commit a transaction that is already committed. " + + "TxnId : {}, clientName:{}.", txnID, clientName); } future.complete(null); return; @@ -431,8 +432,8 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, && (txnMeta.status() == TxnStatus.ABORTED || txnMeta.status() == TxnStatus.ABORTING)) { if (LOG.isDebugEnabled()) { - LOG.debug("try to abort a transaction that is already aborted. " + - "TxnId : {}, clientName:{}.", txnID, clientName); + LOG.debug("try to abort a transaction that is already aborted. " + + "TxnId : {}, clientName:{}.", txnID, clientName); } future.complete(null); return; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4c78c9a749bb0..26e784c9f5028 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -2458,7 +2458,7 @@ protected void handleNewTxn(CommandNewTxn command) { final long requestId = command.getRequestId(); final TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId()); final String clientName; - if(command.hasClientName()) { + if (command.hasClientName()) { clientName = command.getClientName(); } else { clientName = null; @@ -2572,7 +2572,7 @@ protected void handleEndTxn(CommandEndTxn command) { final long requestId = command.getRequestId(); final int txnAction = command.getTxnAction().getValue(); final String clientName; - if(command.hasClientName()) { + if (command.hasClientName()) { clientName = command.getClientName(); } else { clientName = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java index 7b22bc1eeb2bb..92d6bcf59227d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/recover/TransactionRecoverTrackerImpl.java @@ -126,7 +126,7 @@ public void appendOpenTransactionToTimeoutTracker() { @Override public void handleCommittingAndAbortingTransaction() { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("Handle committing and aborting transaction, tcId: {}, committingTransactions: {}, " + "abortingTransactions: {}", tcId, committingTransactions, abortingTransactions); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java index 58bda9d61ef54..c759ac8dc7e7e 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java @@ -20,10 +20,8 @@ import java.io.Serializable; import java.util.Objects; -import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Data; -import lombok.Getter; import lombok.NoArgsConstructor; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 1e99502301768..7d963f0e29508 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1338,7 +1338,7 @@ public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds, String .setTcId(tcId) .setRequestId(requestId) .setTxnTtlSeconds(ttlSeconds); - if(clientName != null) { + if (clientName != null) { commandNewTxn.setClientName(clientName); } return serializeWithSize(cmd); @@ -1446,7 +1446,7 @@ public static BaseCommand newEndTxn(long requestId, long txnIdLeastBits, long tx .setRequestId(requestId) .setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits) .setTxnAction(txnAction); - if(clientName != null) { + if (clientName != null) { commandEndTxn.setClientName(clientName); } return cmd; diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java index 952ae10736478..444ef3df8f207 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java @@ -18,11 +18,10 @@ */ package org.apache.pulsar.transaction.coordinator; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; -import java.util.concurrent.CompletableFuture; - /** * An interface for persist metadata of aborted txn and recover from a compacted topic. */ diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java index d8ffa003464df..43cdc07836178 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java @@ -18,7 +18,14 @@ */ package org.apache.pulsar.transaction.coordinator.impl; - +import static org.apache.commons.lang3.StringUtils.isBlank; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -38,15 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; - -import static org.apache.commons.lang3.StringUtils.isBlank; /** * A transaction metadata preserver implementation. @@ -72,7 +70,7 @@ public class MLTransactionMetadataPreserverImpl implements TransactionMetadataPr public final Set needToFlush = new HashSet<>(); /** - * do not enable terminated transaction metadata persist + * do not enable terminated transaction metadata persist. */ public MLTransactionMetadataPreserverImpl() { this.transactionMetaPersistCount = 0; @@ -93,7 +91,8 @@ public MLTransactionMetadataPreserverImpl(TransactionCoordinatorID tcID, || pulsarClient.isClosed() || tcID == null) { // do not enable terminated transaction metadata persist log.info("Transaction metadata preserver init failed, transaction metadata persist count is {}, " - + "pulsar client is null or closed, or transaction coordinator id is null.", transactionMetaPersistCount); + + "pulsar client is null or closed, or transaction coordinator id is null.", + transactionMetaPersistCount); this.transactionMetaPersistCount = 0; this.transactionMetaPersistTimeInMS = 0; this.transactionMetaExpireCheckIntervalInMS = 0; @@ -238,7 +237,7 @@ public void flush(String clientName) throws CoordinatorException.PreserverClosed } TerminatedTransactionMetadataEntry entry = new TerminatedTransactionMetadataEntry(); entry.setTxnMetas(terminatedTxnMetaList.get(clientName)); - try{ + try { producer.newMessage().key(clientName).value(entry).send(); } catch (PulsarClientException e) { log.error("Flush transaction metadata failed, client name:{}, tcID:{}, reason:{}.", @@ -261,7 +260,7 @@ public TxnMeta getTxnMeta(TxnID txnID, String clientName) { // this method is not executed in single thread, so we need to ensure // NPE exception will not be thrown. Map txnIDTxnMetaMap = terminatedTxnMetaMap.get(clientName); - if(txnIDTxnMetaMap == null) { + if (txnIDTxnMetaMap == null) { return null; } return txnIDTxnMetaMap.get(txnID); @@ -269,7 +268,7 @@ public TxnMeta getTxnMeta(TxnID txnID, String clientName) { @Override public void expireTransactionMetadata() { - if(!enabled()) { + if (!enabled()) { return; } if (log.isDebugEnabled()) { @@ -291,7 +290,7 @@ public void expireTransactionMetadata() { break; } } - if(txnMetaList.isEmpty()) { + if (txnMetaList.isEmpty()) { // delete the transaction metadata from the system topic __terminated_txn_state // producer.newMessage().key(clientName).value(null).send(); terminatedTxnMetaList.remove(clientName); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index df1f92267b285..a6a0161987366 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.transaction.coordinator.impl; +import static org.apache.commons.lang3.StringUtils.isBlank; +import static org.apache.commons.lang3.StringUtils.isNotBlank; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.MoreExecutors; import io.netty.util.Timeout; @@ -64,8 +66,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.commons.lang3.StringUtils.isBlank; -import static org.apache.commons.lang3.StringUtils.isNotBlank; /** * The provider that offers managed ledger implementation of {@link TransactionMetadataStore}. @@ -289,8 +289,8 @@ public CompletableFuture getTxnMeta(TxnID txnid) { public CompletableFuture getTxnMeta(TxnID txnID, String clientName) { Pair> txnMetaListPair = txnMetaMap.get(txnID.getLeastSigBits()); CompletableFuture completableFuture = new CompletableFuture<>(); - if (txnMetaListPair == null && transactionMetadataPreserverEnabled() && - clientName != null && isNotBlank(clientName)) { + if (txnMetaListPair == null && transactionMetadataPreserverEnabled() + && clientName != null && isNotBlank(clientName)) { completableFuture.complete(getTxnMetaFromPreserver(txnID, clientName)); } if (txnMetaListPair == null) { @@ -313,7 +313,7 @@ public boolean transactionMetadataPreserverEnabled() { @Override public CompletableFuture appendTxnMetaToPreserver(TxnMeta txnMeta, String clientName) { - if(!transactionMetadataPreserver.enabled() || isBlank(clientName)) { + if (!transactionMetadataPreserver.enabled() || isBlank(clientName)) { return CompletableFuture.completedFuture(null); } CompletableFuture completableFuture = new CompletableFuture<>(); @@ -355,7 +355,7 @@ public CompletableFuture newTransaction(long timeOut, String owner, Strin } transactionMetadataEntry.setOwner(owner); } - if(clientName != null && !StringUtils.isBlank(clientName)) { + if (clientName != null && !StringUtils.isBlank(clientName)) { transactionMetadataEntry.setClientName(clientName); } transactionLog.append(transactionMetadataEntry) @@ -502,7 +502,7 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, // is useless, we will remove it to save memory and reduce message size. txnMeta.clearProducedPartitions(); txnMeta.clearAckedPartitions(); - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("flush txnId:{} metadata before update status to {}", txnID, newStatus.name()); } @@ -539,7 +539,7 @@ public CompletableFuture updateTxnStatus(TxnID txnID, TxnStatus newStatus, this.transactionTimeoutCount.increment(); } if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) { - if(log.isDebugEnabled()) { + if (log.isDebugEnabled()) { log.debug("TxnID : " + txnMeta.id().toString() + " update txn status to " + newStatus.name()); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java index ef73b7cac69f5..7410bcbeb2464 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnMetaImpl.java @@ -18,13 +18,12 @@ */ package org.apache.pulsar.transaction.coordinator.impl; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Getter; import lombok.Setter; import org.apache.pulsar.client.api.transaction.TxnID; From f75fe54bbff639a9b2831333ac1329c32a460c59 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 24 May 2023 14:08:07 +0800 Subject: [PATCH 09/13] fix ConcurrentModificationException in preserver. --- .../impl/MLTransactionMetadataPreserverImpl.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java index 43cdc07836178..9ac5874a93de9 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java @@ -21,6 +21,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -275,7 +276,11 @@ public void expireTransactionMetadata() { log.debug("Start to check transaction metadata expire, tcID:{}.", tcID); } long now = System.currentTimeMillis(); - for (Map.Entry> entry : terminatedTxnMetaList.entrySet()) { + Iterator>> iterator = + terminatedTxnMetaList.entrySet().iterator(); + Map.Entry> entry; + while(iterator.hasNext()) { + entry = iterator.next(); String clientName = entry.getKey(); LinkedList txnMetaList = entry.getValue(); Map txnIDTxnMetaMap = terminatedTxnMetaMap.get(clientName); @@ -293,7 +298,7 @@ public void expireTransactionMetadata() { if (txnMetaList.isEmpty()) { // delete the transaction metadata from the system topic __terminated_txn_state // producer.newMessage().key(clientName).value(null).send(); - terminatedTxnMetaList.remove(clientName); + iterator.remove(); terminatedTxnMetaMap.remove(clientName); } } From c3e4df0b7143795d97ca117d11c6b2557653c9f9 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Wed, 24 May 2023 15:06:13 +0800 Subject: [PATCH 10/13] fix checkstyle. --- .../coordinator/impl/MLTransactionMetadataPreserverImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java index 9ac5874a93de9..f89d33dbc5be5 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java @@ -279,7 +279,7 @@ public void expireTransactionMetadata() { Iterator>> iterator = terminatedTxnMetaList.entrySet().iterator(); Map.Entry> entry; - while(iterator.hasNext()) { + while (iterator.hasNext()) { entry = iterator.next(); String clientName = entry.getKey(); LinkedList txnMetaList = entry.getValue(); From 8b51c896ac280bf29207fee544e7b87c183d4367 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 16 May 2024 15:20:48 +0800 Subject: [PATCH 11/13] throw InvalidTxnStatusException instead of TransactionNotFound. --- .../broker/TransactionMetadataStoreService.java | 11 +++++++++++ .../service/TransactionMetadataStoreServiceTest.java | 8 ++++++++ 2 files changed, 19 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index 4b6d96a863ed9..f06c9ebe80385 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -30,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; @@ -437,6 +438,16 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout, } future.complete(null); return; + } else if (txnMeta != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to commit a aborted txn or abort a committed txn. " + + "TxnId : {}, clientName:{}.", txnID, clientName); + } + TxnStatus expectStatus = txnAction == TxnAction.COMMIT_VALUE + ? TxnStatus.COMMITTED : TxnStatus.ABORTED; + future.completeExceptionally(new CompletionException( + new InvalidTxnStatusException(txnID, expectStatus, txnMeta.status()))); + return; } } LOG.error("End transaction fail! TxnId : {}, " diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java index cc8eec4b40cc0..596e2ce04b91d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java @@ -153,6 +153,14 @@ public void testCommitAndAbortTerminatedTransactionWithPreserverEnabled() throws // abort again. transactionMetadataStoreService.endTransaction(txnID1, TxnAction.ABORT_VALUE, false, "txnClient").get(); + // commit the aborted transaction will throw InvalidTxnStatusException. + transactionMetadataStoreService.endTransaction(txnID1, TxnAction.COMMIT_VALUE, false, "txnClient") + .handle((txnMeta, throwable) -> { + Assert.assertNotNull(throwable); + Assert.assertTrue(FutureUtil.unwrapCompletionException(throwable) + instanceof CoordinatorException.InvalidTxnStatusException); + return null; + }).get(); // create and commit third transaction, which will trigger the first transaction to be removed. TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000, null, "txnClient").get(); From ba8f7fa81132a030fcc7fc652fab3de3e2abfa44 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 16 May 2024 15:36:08 +0800 Subject: [PATCH 12/13] Fix preserver recovery failed. --- .../TransactionMetadataStoreService.java | 42 +++++++++++-------- .../TransactionMetadataPreserver.java | 4 +- .../MLTransactionMetadataPreserverImpl.java | 3 +- 3 files changed, 30 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java index f06c9ebe80385..5499427c46745 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/TransactionMetadataStoreService.java @@ -152,7 +152,10 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc } catch (Throwable e) { LOG.error("Failed to create transaction metadata preserver for tcId {}, reason:{}", tcId, e); - preserver = new MLTransactionMetadataPreserverImpl(); + completableFuture.completeExceptionally(e); + tcLoadSemaphore.release(); + failPendingConnectRequests(e, deque); + return; } openTransactionMetadataStore(tcId, preserver, timeoutTracker, recoverTracker).thenAccept( store -> internalPinnedExecutor.execute(() -> { @@ -191,22 +194,7 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc // release before handle request queue, //in order to client reconnect infinite loop tcLoadSemaphore.release(); - long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT; - while (true) { - // prevent thread in a busy loop. - if (System.currentTimeMillis() < endTime) { - CompletableFuture future = deque.poll(); - if (future != null) { - // this means that this tc client connection connect fail - future.completeExceptionally(realCause); - } else { - break; - } - } else { - deque.clear(); - break; - } - } + failPendingConnectRequests(e, deque); LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e); }); return null; @@ -230,6 +218,26 @@ public CompletableFuture handleTcClientConnect(TransactionCoordinatorID tc return completableFuture; } + private void failPendingConnectRequests(Throwable e, Deque> deque) { + Throwable realCause = FutureUtil.unwrapCompletionException(e); + long endTime = System.currentTimeMillis() + HANDLE_PENDING_CONNECT_TIME_OUT; + while (true) { + // prevent thread in a busy loop. + if (System.currentTimeMillis() < endTime) { + CompletableFuture future = deque.poll(); + if (future != null) { + // this means that this tc client connection connect fail + future.completeExceptionally(realCause); + } else { + break; + } + } else { + deque.clear(); + break; + } + } + } + public CompletableFuture openTransactionMetadataStore(TransactionCoordinatorID tcId, TransactionMetadataPreserver preserver, diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java index 444ef3df8f207..7c786e9d116a3 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java @@ -19,6 +19,8 @@ package org.apache.pulsar.transaction.coordinator; import java.util.concurrent.CompletableFuture; + +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException; @@ -29,7 +31,7 @@ public interface TransactionMetadataPreserver { /** * Replay transaction metadata to initialize the terminatedTxnMetaMap. */ - void replay(); + void replay() throws PulsarClientException; /** * Close the transaction metadata preserver. diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java index f89d33dbc5be5..a1792841d50da 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java @@ -135,7 +135,7 @@ private static String getTransactionMetadataPersistTopicName(TransactionCoordina * Replay transaction metadata to initialize the terminatedTxnMetaMap. */ @Override - public void replay() { + public void replay() throws PulsarClientException { if (!enabled()) { log.info("Transaction metadata preserver is not enabled, do not replay transaction metadata."); return; @@ -159,6 +159,7 @@ public void replay() { } catch (Exception e) { // Though replay transaction metadata failed, the transaction coordinator can still work. log.error("Replay transaction metadata failed, tcID:{}, reason:{}.", tcID, e); + throw e; } finally { reader.closeAsync(); } From b25128a680b1916927fe05d1d12343f7d8d3cec8 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Thu, 16 May 2024 15:47:53 +0800 Subject: [PATCH 13/13] [Feature] add metric for preserver --- ...AggregatedTransactionCoordinatorStats.java | 5 +++ .../prometheus/TransactionAggregator.java | 8 +++++ .../TransactionMetadataPreserver.java | 6 ++++ .../MLTransactionMetadataPreserverImpl.java | 32 ++++++++++++++++++- .../impl/MLTransactionMetadataStore.java | 8 +++++ .../impl/TransactionMetadataStoreStats.java | 6 ++++ 6 files changed, 64 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedTransactionCoordinatorStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedTransactionCoordinatorStats.java index f417715ffd619..8aeddb4101427 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedTransactionCoordinatorStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/AggregatedTransactionCoordinatorStats.java @@ -19,6 +19,9 @@ package org.apache.pulsar.broker.stats.prometheus; public class AggregatedTransactionCoordinatorStats { + public long tcRecoveryTime; + + public long preserverRecoveryTime; public int actives; @@ -35,6 +38,8 @@ public class AggregatedTransactionCoordinatorStats { public long[] executionLatency; public void reset() { + tcRecoveryTime = 0; + preserverRecoveryTime = 0; actives = 0; committedCount = 0; abortedCount = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java index 3da061f6ffef2..3ef9c94b96cc8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/TransactionAggregator.java @@ -84,6 +84,10 @@ public static void generate(PulsarService pulsar, PrometheusMetricStreams stream transactionCoordinatorStats.reset(); TransactionMetadataStoreStats transactionMetadataStoreStats = transactionMetadataStore.getMetadataStoreStats(); + transactionCoordinatorStats.tcRecoveryTime = + transactionMetadataStoreStats.getTcRecoverTime(); + transactionCoordinatorStats.preserverRecoveryTime = + transactionMetadataStoreStats.getPreserverRecoverTime(); transactionCoordinatorStats.actives = transactionMetadataStoreStats.getActives(); transactionCoordinatorStats.committedCount = @@ -239,6 +243,10 @@ private static void printManageLedgerStats(PrometheusMetricStreams stream, Strin static void printTransactionCoordinatorStats(PrometheusMetricStreams stream, String cluster, AggregatedTransactionCoordinatorStats stats, long coordinatorId) { + writeMetric(stream, "pulsar_txn_tc_recovery_time_ms", stats.tcRecoveryTime, cluster, + coordinatorId); + writeMetric(stream, "pulsar_txn_preserver_recovery_time_ms", stats.preserverRecoveryTime, + cluster, coordinatorId); writeMetric(stream, "pulsar_txn_active_count", stats.actives, cluster, coordinatorId); writeMetric(stream, "pulsar_txn_committed_total", stats.committedCount, cluster, diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java index 7c786e9d116a3..ed04be65179a9 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/TransactionMetadataPreserver.java @@ -78,4 +78,10 @@ public interface TransactionMetadataPreserver { * @return the interval of expiring the transaction metadata in MS. */ long getExpireOldTransactionMetadataIntervalMS(); + + /** + * Get the time used for recovery in MS. + * @return + */ + long getRecoveryTime(); } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java index a1792841d50da..dca19bed943a7 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataPreserverImpl.java @@ -27,6 +27,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; + +import io.prometheus.client.Summary; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; @@ -38,6 +40,7 @@ import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.RecoverTimeRecord; import org.apache.pulsar.transaction.coordinator.TerminatedTransactionMetadataEntry; import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; import org.apache.pulsar.transaction.coordinator.TransactionMetadataPreserver; @@ -70,6 +73,14 @@ public class MLTransactionMetadataPreserverImpl implements TransactionMetadataPr public final Set needToFlush = new HashSet<>(); + public final RecoverTimeRecord recoverTime = new RecoverTimeRecord(); + + private static final double[] QUANTILES = {0.50, 0.95, 0.99, 1}; + + private static final Summary produceLatency = buildSummary("pulsar_txn_preserver_produce_latency", "-", + new String[]{"coordinator_id"}); + + /** * do not enable terminated transaction metadata persist. */ @@ -108,6 +119,7 @@ public MLTransactionMetadataPreserverImpl(TransactionCoordinatorID tcID, this.transactionMetaExpireCheckIntervalInMS = transactionMetadataExpireIntervalInSecond * 1000; this.transactionMetaPersistTimeInMS = transactionMetaPersistTimeInHour * 60 * 60 * 1000; String topicName = getTransactionMetadataPersistTopicName(tcID); + recoverTime.setRecoverStartTime(System.currentTimeMillis()); this.producer = pulsarClient.newProducer(Schema.JSON(TerminatedTransactionMetadataEntry.class)) .topic(topicName) .createAsync().thenCompose(producer -> { @@ -155,6 +167,7 @@ public void replay() throws PulsarClientException { log.debug("Replay transaction metadata, tcID:{}, client name:{}.", tcID, clientName); } } + recoverTime.setRecoverEndTime(System.currentTimeMillis()); log.info("Replay transaction metadata successfully, tcID:{}.", tcID); } catch (Exception e) { // Though replay transaction metadata failed, the transaction coordinator can still work. @@ -185,7 +198,7 @@ public CompletableFuture closeAsync() { * can improve the produce efficiency with deduplication of those messages with * same clientName. * - * @param txnMeta + * @param txnMeta the transaction metadata * @return */ @Override @@ -240,7 +253,9 @@ public void flush(String clientName) throws CoordinatorException.PreserverClosed TerminatedTransactionMetadataEntry entry = new TerminatedTransactionMetadataEntry(); entry.setTxnMetas(terminatedTxnMetaList.get(clientName)); try { + long start = System.currentTimeMillis(); producer.newMessage().key(clientName).value(entry).send(); + produceLatency.labels(String.valueOf(tcID)).observe(System.currentTimeMillis() - start); } catch (PulsarClientException e) { log.error("Flush transaction metadata failed, client name:{}, tcID:{}, reason:{}.", clientName, tcID, e); @@ -309,4 +324,19 @@ public void expireTransactionMetadata() { public long getExpireOldTransactionMetadataIntervalMS() { return transactionMetaExpireCheckIntervalInMS; } + + + @Override + public long getRecoveryTime() { + return recoverTime.getRecoverEndTime() - recoverTime.getRecoverStartTime(); + } + + private static Summary buildSummary(String name, String help, String[] labelNames) { + Summary.Builder builder = Summary.build(name, help) + .labelNames(labelNames); + for (double quantile : QUANTILES) { + builder.quantile(quantile, 0.01D); + } + return builder.register(); + } } \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java index a6a0161987366..09311b526eed0 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java @@ -638,6 +638,14 @@ public CompletableFuture closeAsync() { @Override public TransactionMetadataStoreStats getMetadataStoreStats() { this.transactionMetadataStoreStats.setCoordinatorId(tcID.getId()); + this.transactionMetadataStoreStats.tcRecoverTime = recoverTime.getRecoverEndTime() + - recoverTime.getRecoverStartTime(); + if (transactionMetadataPreserverEnabled()) { + this.transactionMetadataStoreStats.preserverRecoverTime = + transactionMetadataPreserver.getRecoveryTime(); + } else { + this.transactionMetadataStoreStats.preserverRecoverTime = 0L; + } this.transactionMetadataStoreStats.setActives(txnMetaMap.size()); this.transactionMetadataStoreStats.setCreatedCount(this.createdTransactionCount.longValue()); this.transactionMetadataStoreStats.setCommittedCount(this.committedTransactionCount.longValue()); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TransactionMetadataStoreStats.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TransactionMetadataStoreStats.java index 8b8be92ae1691..4de067b3fe29c 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TransactionMetadataStoreStats.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TransactionMetadataStoreStats.java @@ -33,6 +33,12 @@ public class TransactionMetadataStoreStats { /** The transaction coordinatorId. */ private long coordinatorId; + /** The time used for TC recovery. */ + public long tcRecoverTime; + + /** The time used for preserver recovery. */ + public long preserverRecoverTime; + /** The active transactions. */ private int actives;