From a4c6c4187d4a43326ec421577e394b596ab2cac3 Mon Sep 17 00:00:00 2001 From: YingQun Zhong Date: Tue, 1 Nov 2022 09:35:21 +0800 Subject: [PATCH 1/5] [improve][broker] make managedLedgerOffloadedReadPriority compatible with broker property (#17803) --- conf/broker.conf | 4 ++ .../policies/data/OffloadPoliciesImpl.java | 9 +++- .../policies/data/OffloadPoliciesTest.java | 46 +++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 76f75a7d82530..85ffc11d34f42 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1149,6 +1149,10 @@ managedLedgerOffloadDeletionLagMs=14400000 # (default is -1, which is disabled) managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1 +# Read priority when ledgers exists in both bookkeeper and the second layer storage +# (tiered-storage-first/bookkeeper-first, default is tiered-storage-first) +managedLedgerDataReadPriority=tiered-storage-first + # The number of seconds before triggering automatic offload to long term storage # (default is -1, which is disabled) managedLedgerOffloadThresholdInSeconds=-1 diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java index 9b4e8506c89f6..4eee063cc9618 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPoliciesImpl.java @@ -75,6 +75,7 @@ public class OffloadPoliciesImpl implements Serializable, OffloadPolicies { public static final String OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE = "managedLedgerOffloadAutoTriggerSizeThresholdBytes"; public static final String DELETION_LAG_NAME_IN_CONF_FILE = "managedLedgerOffloadDeletionLagMs"; + public static final String DATA_READ_PRIORITY_NAME_IN_CONF_FILE = "managedLedgerDataReadPriority"; public static final OffloadedReadPriority DEFAULT_OFFLOADED_READ_PRIORITY = OffloadedReadPriority.TIERED_STORAGE_FIRST; @@ -261,9 +262,10 @@ public void compatibleWithBrokerConfigFile(Properties properties) { Long.parseLong(properties.getProperty(DELETION_LAG_NAME_IN_CONF_FILE))); } - if (properties.containsKey("managedLedgerDataReadPriority")) { + if (!properties.containsKey("managedLedgerOffloadedReadPriority") + && properties.containsKey(DATA_READ_PRIORITY_NAME_IN_CONF_FILE)) { setManagedLedgerOffloadedReadPriority( - OffloadedReadPriority.fromString(properties.getProperty("managedLedgerDataReadPriority"))); + OffloadedReadPriority.fromString(properties.getProperty(DATA_READ_PRIORITY_NAME_IN_CONF_FILE))); } } @@ -479,6 +481,9 @@ private static Object getCompatibleValue(Properties properties, Field field) { } else if (field.getName().equals("managedLedgerOffloadDeletionLagInMillis")) { object = properties.getProperty("managedLedgerOffloadDeletionLagInMillis", properties.getProperty(DELETION_LAG_NAME_IN_CONF_FILE)); + } else if (field.getName().equals("managedLedgerOffloadedReadPriority")) { + object = properties.getProperty("managedLedgerOffloadedReadPriority", + properties.getProperty(DATA_READ_PRIORITY_NAME_IN_CONF_FILE)); } else { object = properties.get(field.getName()); } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java index f6496af2cfe72..c0d45389821f9 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/OffloadPoliciesTest.java @@ -308,4 +308,50 @@ public void mergeTest() { Assert.assertNull(offloadPolicies.getS3ManagedLedgerOffloadRegion()); } + + @Test + public void brokerPropertyCompatibleTest() { + final Long brokerOffloadThreshold = 0L; + final Long brokerDeletionLag = 2000L; + final String brokerReadPriority = "bookkeeper-first"; + + // 1. mergeConfiguration test + Properties brokerProperties = new Properties(); + brokerProperties.setProperty("managedLedgerOffloadAutoTriggerSizeThresholdBytes", "" + brokerOffloadThreshold); + brokerProperties.setProperty("managedLedgerOffloadDeletionLagMs", "" + brokerDeletionLag); + brokerProperties.setProperty("managedLedgerDataReadPriority", "" + brokerReadPriority); + OffloadPoliciesImpl offloadPolicies = + OffloadPoliciesImpl.mergeConfiguration(null, null, brokerProperties); + Assert.assertNotNull(offloadPolicies); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), brokerOffloadThreshold); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), brokerDeletionLag); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().toString(), brokerReadPriority); + + // 2. compatibleWithBrokerConfigFile test + brokerProperties = new Properties(); + brokerProperties.setProperty("managedLedgerOffloadAutoTriggerSizeThresholdBytes", "" + brokerOffloadThreshold); + brokerProperties.setProperty("managedLedgerOffloadDeletionLagMs", "" + brokerDeletionLag); + brokerProperties.setProperty("managedLedgerDataReadPriority", "" + brokerReadPriority); + offloadPolicies = OffloadPoliciesImpl.create(brokerProperties); + Assert.assertNotNull(offloadPolicies); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), brokerOffloadThreshold); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), brokerDeletionLag); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().toString(), brokerReadPriority); + + + brokerProperties = new Properties(); + // 2.1 offload properties name in OffloadPoliciesImpl + brokerProperties.setProperty("managedLedgerOffloadThresholdInBytes", "" + (brokerOffloadThreshold)); + brokerProperties.setProperty("managedLedgerOffloadDeletionLagInMillis", "" + (brokerDeletionLag)); + brokerProperties.setProperty("managedLedgerOffloadedReadPriority", "" + (brokerReadPriority)); + // 2.2 offload properties name in conf file + brokerProperties.setProperty("managedLedgerOffloadAutoTriggerSizeThresholdBytes", "" + brokerOffloadThreshold + 30); + brokerProperties.setProperty("managedLedgerOffloadDeletionLagMs", "" + brokerDeletionLag + 30); + brokerProperties.setProperty("managedLedgerDataReadPriority", "" + "tiered-storage-first"); + offloadPolicies = OffloadPoliciesImpl.create(brokerProperties); + Assert.assertNotNull(offloadPolicies); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadThresholdInBytes(), brokerOffloadThreshold); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDeletionLagInMillis(), brokerDeletionLag); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadedReadPriority().toString(), brokerReadPriority); + } } From 8308b8882d9a6c1704fa0d6c2ecf203296ce6ea8 Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Tue, 1 Nov 2022 11:45:19 +0800 Subject: [PATCH 2/5] [cleanup] remove duplicate `sendSuccess` and `sendError` in `PulsarCommandSender` (#18250) change to `sendSuccessResponse` and `sendErrorResponse` --- .../org/apache/pulsar/broker/service/Consumer.java | 4 ++-- .../pulsar/broker/service/PulsarCommandSender.java | 4 ---- .../pulsar/broker/service/PulsarCommandSenderImpl.java | 10 ---------- .../apache/pulsar/broker/service/TopicListService.java | 2 +- 4 files changed, 3 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index dbad30033b8f7..f43f16ef5871f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -395,10 +395,10 @@ public void doUnsubscribe(final long requestId) { subscription.doUnsubscribe(this).thenAccept(v -> { log.info("Unsubscribed successfully from {}", subscription); cnx.removedConsumer(this); - cnx.getCommandSender().sendSuccess(requestId); + cnx.getCommandSender().sendSuccessResponse(requestId); }).exceptionally(exception -> { log.warn("Unsubscribe failed for {}", subscription, exception); - cnx.getCommandSender().sendError(requestId, BrokerServiceException.getClientErrorCode(exception), + cnx.getCommandSender().sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage()); return null; }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java index 5659e84f1e117..50579b3d741ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java @@ -72,10 +72,6 @@ void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boo void sendActiveConsumerChange(long consumerId, boolean isActive); - void sendSuccess(long requestId); - - void sendError(long requestId, ServerError error, String message); - void sendReachedEndOfTopic(long consumerId); boolean sendTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java index 8bdb1dbe6f2b3..5ef5cc2326105 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java @@ -201,16 +201,6 @@ public void sendActiveConsumerChange(long consumerId, boolean isActive) { cnx.ctx().voidPromise()); } - @Override - public void sendSuccess(long requestId) { - cnx.ctx().writeAndFlush(Commands.newSuccess(requestId), cnx.ctx().voidPromise()); - } - - @Override - public void sendError(long requestId, ServerError error, String message) { - cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message), cnx.ctx().voidPromise()); - } - @Override public void sendReachedEndOfTopic(long consumerId) { // Only send notification if the client understand the command diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java index 3ab40b269d91e..9ea7a94bac202 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java @@ -207,7 +207,7 @@ public void handleWatchTopicListClose(CommandWatchTopicListClose commandWatchTop long requestId = commandWatchTopicListClose.getRequestId(); long watcherId = commandWatchTopicListClose.getWatcherId(); deleteTopicListWatcher(watcherId); - connection.getCommandSender().sendSuccess(requestId); + connection.getCommandSender().sendSuccessResponse(requestId); } public void deleteTopicListWatcher(Long watcherId) { From bdf3b4320f1517db5002e1407a56745590e83722 Mon Sep 17 00:00:00 2001 From: labuladong Date: Tue, 1 Nov 2022 12:45:24 +0800 Subject: [PATCH 3/5] [fix][broker] read local cookie when start pulsar standalone (#18260) --- .../org/apache/pulsar/PulsarStandalone.java | 3 +- .../apache/pulsar/PulsarStandaloneTest.java | 36 +++++++ ...r_broker_test_standalone_with_rocksdb.conf | 97 +++++++++++++++++++ .../pulsar/metadata/bookkeeper/BKCluster.java | 19 ++++ 4 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java index 3408d7ccfe464..0ff3318057062 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java @@ -440,7 +440,8 @@ public void close() { } } - private void startBookieWithMetadataStore() throws Exception { + @VisibleForTesting + void startBookieWithMetadataStore() throws Exception { if (StringUtils.isBlank(metadataStoreUrl)){ log.info("Starting BK with RocksDb metadata store"); metadataStoreUrl = "rocksdb://" + Paths.get(metadataDir).toAbsolutePath(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java index 5386322565120..357a8e1bd4b0f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/PulsarStandaloneTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar; +import static org.apache.commons.io.FileUtils.cleanDirectory; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.eq; @@ -26,6 +27,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.File; +import java.util.List; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.util.IOUtils; import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.naming.NamespaceName; @@ -35,6 +40,7 @@ import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.TenantResources; +import org.testng.Assert; import org.testng.annotations.Test; @Test(groups = "broker") @@ -91,4 +97,34 @@ public void testCreateNameSpace() throws Exception { verify(admin.namespaces(), times(1)).createNamespace(eq(ns.toString())); } + @Test(groups = "broker") + public void testStandaloneWithRocksDB() throws Exception { + String[] args = new String[]{"--config", + "./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"}; + final int bookieNum = 3; + final File tempDir = IOUtils.createTempDir("standalone", "test"); + + PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args); + standalone.setBkDir(tempDir.getAbsolutePath()); + standalone.setNumOfBk(bookieNum); + + standalone.startBookieWithMetadataStore(); + List firstBsConfs = standalone.bkCluster.getBsConfs(); + Assert.assertEquals(firstBsConfs.size(), bookieNum); + standalone.close(); + + // start twice, read cookie from local folder + standalone.startBookieWithMetadataStore(); + List secondBsConfs = standalone.bkCluster.getBsConfs(); + Assert.assertEquals(secondBsConfs.size(), bookieNum); + + for (int i = 0; i < bookieNum; i++) { + ServerConfiguration conf1 = firstBsConfs.get(i); + ServerConfiguration conf2 = secondBsConfs.get(i); + Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort()); + } + standalone.close(); + cleanDirectory(tempDir); + } + } diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf new file mode 100644 index 0000000000000..d8b26bbbfa99d --- /dev/null +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +applicationName="pulsar_broker" +metadataStoreUrl= +configurationMetadataStoreUrl= +brokerServicePort=6650 +brokerServicePortTls=6651 +webServicePort=8080 +allowLoopback=true +webServicePortTls=4443 +bindAddress=0.0.0.0 +advertisedAddress= +advertisedListeners= +internalListenerName=internal +clusterName="test_cluster" +brokerShutdownTimeoutMs=3000 +backlogQuotaCheckEnabled=true +backlogQuotaCheckIntervalInSeconds=60 +backlogQuotaDefaultLimitGB=50 +brokerDeleteInactiveTopicsEnabled=true +brokerDeleteInactiveTopicsFrequencySeconds=60 +allowAutoTopicCreation=true +allowAutoTopicCreationType=non-partitioned +defaultNumPartitions=1 +messageExpiryCheckIntervalInMinutes=5 +clientLibraryVersionCheckEnabled=false +clientLibraryVersionCheckAllowUnversioned=true +statusFilePath=/tmp/status.html +tlsEnabled=false +tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt +tlsKeyFilePath=/home/local/conf/pulsar/server.key +tlsTrustCertsFilePath= +tlsAllowInsecureConnection=false +authenticationEnabled=false +authorizationEnabled=false +superUserRoles="test_user" +brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled" +brokerClientAuthenticationParameters= +bookkeeperClientAuthenticationPlugin="test_auth_plugin" +bookkeeperClientAuthenticationAppId="test_auth_id" +bookkeeperClientTimeoutInSeconds=30 +bookkeeperClientSpeculativeReadTimeoutInMillis=0 +bookkeeperClientHealthCheckEnabled=true +bookkeeperClientHealthCheckIntervalSeconds=60 +bookkeeperClientHealthCheckErrorThresholdPerInterval=5 +bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800 +bookkeeperClientRackawarePolicyEnabled=true +bookkeeperClientRegionawarePolicyEnabled=false +bookkeeperClientMinNumRacksPerWriteQuorum=2 +bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false +bookkeeperClientReorderReadSequenceEnabled=false +bookkeeperClientIsolationGroups="test_group" +managedLedgerDefaultEnsembleSize=3 +managedLedgerDefaultWriteQuorum=2 +managedLedgerDefaultAckQuorum=2 +managedLedgerCacheSizeMB=1024 +managedLedgerCacheEvictionWatermark=10 +managedLedgerDefaultMarkDeleteRateLimit=0.1 +managedLedgerMaxEntriesPerLedger=50000 +managedLedgerMinLedgerRolloverTimeMinutes=10 +managedLedgerMaxLedgerRolloverTimeMinutes=240 +managedLedgerCursorMaxEntriesPerLedger=50000 +managedLedgerCursorRolloverTimeInSeconds = 14400 +managedLedgerDataReadPriority = bookkeeper-first +loadBalancerEnabled = false +loadBalancerReportUpdateThresholdPercentage=10 +loadBalancerReportUpdateMaxIntervalMinutes=15 +loadBalancerHostUsageCheckIntervalMinutes=1 +loadBalancerSheddingIntervalMinutes=30 +loadBalancerSheddingGracePeriodMinutes=30 +loadBalancerBrokerUnderloadedThresholdPercentage=50 +loadBalancerBrokerOverloadedThresholdPercentage=85 +replicationMetricsEnabled=true +replicationConnectionsPerBroker=16 +replicationProducerQueueSize=1000 +replicatorPrefix=pulsar.repl +brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up +supportedNamespaceBundleSplitAlgorithms=[range_equally_divide] +defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide +maxMessagePublishBufferSizeInMB=-1 diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java index 8a7ea62f7cb5e..c2f3f72ec21c0 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java @@ -30,9 +30,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.bookie.Cookie; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.allocator.PoolingPolicy; import org.apache.bookkeeper.common.component.ComponentStarter; @@ -65,6 +68,7 @@ public class BKCluster implements AutoCloseable { // BookKeeper related variables private final List tmpDirs = new ArrayList<>(); private final List bookieComponents = new ArrayList<>(); + @Getter private final List bsConfs = new ArrayList<>(); protected final ServerConfiguration baseConf; @@ -231,9 +235,24 @@ private ServerConfiguration newServerConfiguration(int index) throws Exception { // and 2nd bookie's cookie validation fails port = clusterConf.bkPort; } + File[] cookieDir = dataDir.listFiles((file) -> file.getName().equals("current")); + if (cookieDir != null && cookieDir.length > 0) { + String existBookieAddr = parseBookieAddressFromCookie(cookieDir[0]); + if (existBookieAddr != null) { + baseConf.setAdvertisedAddress(existBookieAddr.split(":")[0]); + port = Integer.parseInt(existBookieAddr.split(":")[1]); + } + } return newServerConfiguration(port, dataDir, new File[]{dataDir}); } + private String parseBookieAddressFromCookie(File dir) throws IOException { + Cookie cookie = Cookie.readFromDirectory(dir); + Pattern pattern = Pattern.compile(".*bookieHost: \"(.*?)\".*", Pattern.DOTALL); + Matcher m = pattern.matcher(cookie.toString()); + return m.find() ? m.group(1) : null; + } + private ClientConfiguration newClientConfiguration() { return new ClientConfiguration(baseConf); } From eaad1940fdbfe4d4324e267047b9164fafc091fb Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Tue, 1 Nov 2022 12:47:42 +0800 Subject: [PATCH 4/5] [fix][client] Fix IllegalThreadStateException when using newThread in ExecutorProvider.ExtendedThreadFactory --- .../java/org/apache/pulsar/client/util/ExecutorProvider.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java index 037aef411a0e9..88654c51300ea 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java @@ -42,7 +42,7 @@ public class ExecutorProvider { public static class ExtendedThreadFactory extends DefaultThreadFactory { @Getter - private Thread thread; + private volatile Thread thread; public ExtendedThreadFactory(String poolName) { super(poolName, false); } @@ -52,9 +52,10 @@ public ExtendedThreadFactory(String poolName, boolean daemon) { @Override public Thread newThread(Runnable r) { - thread = super.newThread(r); + Thread thread = super.newThread(r); thread.setUncaughtExceptionHandler((t, e) -> log.error("Thread {} got uncaught Exception", t.getName(), e)); + this.thread = thread; return thread; } } From 9a454a65d07cf716e2c23a90fb32ca7a88e92bf8 Mon Sep 17 00:00:00 2001 From: WJL3333 Date: Tue, 1 Nov 2022 12:49:06 +0800 Subject: [PATCH 5/5] [improve][txn] Avoid txn id primitive auto boxing (#18270) --- .../buffer/metadata/v2/TxnIDData.java | 4 ++-- .../pulsar/client/api/transaction/TxnID.java | 23 +++++++++++++++---- .../impl/transaction/TransactionImpl.java | 15 +++++++----- 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java index bc19f46f252b2..a3bff6dffd322 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/metadata/v2/TxnIDData.java @@ -54,8 +54,8 @@ public int hashCode() { @Override public boolean equals(Object obj) { if (obj instanceof TxnIDData other) { - return Objects.equals(mostSigBits, other.mostSigBits) - && Objects.equals(leastSigBits, other.leastSigBits); + return mostSigBits == other.mostSigBits + && leastSigBits == other.leastSigBits; } return false; diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java index c6729140dd077..772c709bfc07c 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/transaction/TxnID.java @@ -20,7 +20,9 @@ import java.io.Serializable; import java.util.Objects; +import lombok.AccessLevel; import lombok.Data; +import lombok.Getter; import org.apache.pulsar.common.classification.InterfaceAudience; import org.apache.pulsar.common.classification.InterfaceStability; @@ -48,22 +50,35 @@ public class TxnID implements Serializable { */ private final long leastSigBits; + @Getter(AccessLevel.NONE) + private final transient int hashCode; + + @Getter(AccessLevel.NONE) + private final transient String txnStr; + + public TxnID(long mostSigBits, long leastSigBits) { + this.mostSigBits = mostSigBits; + this.leastSigBits = leastSigBits; + this.hashCode = Objects.hash(mostSigBits, leastSigBits); + this.txnStr = "(" + mostSigBits + "," + leastSigBits + ")"; + } + @Override public String toString() { - return "(" + mostSigBits + "," + leastSigBits + ")"; + return txnStr; } @Override public int hashCode() { - return Objects.hash(mostSigBits, leastSigBits); + return hashCode; } @Override public boolean equals(Object obj) { if (obj instanceof TxnID) { TxnID other = (TxnID) obj; - return Objects.equals(mostSigBits, other.mostSigBits) - && Objects.equals(leastSigBits, other.leastSigBits); + return mostSigBits == other.mostSigBits + && leastSigBits == other.leastSigBits; } return false; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java index c7ec884a51da5..b7e085ed82a85 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java @@ -57,6 +57,8 @@ public class TransactionImpl implements Transaction , TimerTask { private final long txnIdLeastBits; private final long txnIdMostBits; + private final TxnID txnId; + private final Map> registerPartitionMap; private final Map, CompletableFuture> registerSubscriptionMap; private final TransactionCoordinatorClientImpl tcClient; @@ -89,6 +91,7 @@ public void run(Timeout timeout) throws Exception { this.transactionTimeoutMs = transactionTimeoutMs; this.txnIdLeastBits = txnIdLeastBits; this.txnIdMostBits = txnIdMostBits; + this.txnId = new TxnID(this.txnIdMostBits, this.txnIdLeastBits); this.registerPartitionMap = new ConcurrentHashMap<>(); this.registerSubscriptionMap = new ConcurrentHashMap<>(); @@ -109,7 +112,7 @@ public CompletableFuture registerProducedTopic(String topic) { return future.thenCompose(ignored -> CompletableFuture.completedFuture(null)); } else { return tcClient.addPublishPartitionToTxnAsync( - new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic)) + txnId, Lists.newArrayList(topic)) .thenCompose(ignored -> CompletableFuture.completedFuture(null)); } }); @@ -150,7 +153,7 @@ public CompletableFuture registerAckedTopic(String topic, String subscript return future.thenCompose(ignored -> CompletableFuture.completedFuture(null)); } else { return tcClient.addSubscriptionToTxnAsync( - new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription) + txnId, topic, subscription) .thenCompose(ignored -> CompletableFuture.completedFuture(null)); } }); @@ -191,7 +194,7 @@ public CompletableFuture commit() { abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(new PulsarClientException .TransactionHasOperationFailedException())); } else { - tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits)) + tcClient.commitAsync(txnId) .whenComplete((vx, ex) -> { if (ex != null) { if (ex instanceof TransactionNotFoundException @@ -217,7 +220,7 @@ public CompletableFuture abort() { CompletableFuture abortFuture = new CompletableFuture<>(); this.state = State.ABORTING; opFuture.whenComplete((v, e) -> { - tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> { + tcClient.abortAsync(txnId).whenComplete((vx, ex) -> { if (ex != null) { if (ex instanceof TransactionNotFoundException @@ -239,7 +242,7 @@ public CompletableFuture abort() { @Override public TxnID getTxnID() { - return new TxnID(txnIdMostBits, txnIdLeastBits); + return this.txnId; } @Override @@ -253,7 +256,7 @@ public boolean checkIfOpen(CompletableFuture completableFuture) { } else { completableFuture .completeExceptionally(new InvalidTxnStatusException( - new TxnID(txnIdMostBits, txnIdLeastBits).toString(), state.name(), State.OPEN.name())); + txnId.toString(), state.name(), State.OPEN.name())); return false; } }