Skip to content

Commit

Permalink
Merge branch 'master' into dev/batch_metadata_store_stats
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun committed Nov 1, 2022
2 parents 0d79b3f + 9a454a6 commit 52b0b31
Show file tree
Hide file tree
Showing 15 changed files with 247 additions and 34 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,10 @@ managedLedgerOffloadDeletionLagMs=14400000
# (default is -1, which is disabled)
managedLedgerOffloadAutoTriggerSizeThresholdBytes=-1

# Read priority when ledgers exists in both bookkeeper and the second layer storage
# (tiered-storage-first/bookkeeper-first, default is tiered-storage-first)
managedLedgerDataReadPriority=tiered-storage-first

# The number of seconds before triggering automatic offload to long term storage
# (default is -1, which is disabled)
managedLedgerOffloadThresholdInSeconds=-1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ public void close() {
}
}

private void startBookieWithMetadataStore() throws Exception {
@VisibleForTesting
void startBookieWithMetadataStore() throws Exception {
if (StringUtils.isBlank(metadataStoreUrl)){
log.info("Starting BK with RocksDb metadata store");
metadataStoreUrl = "rocksdb://" + Paths.get(metadataDir).toAbsolutePath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,10 +395,10 @@ public void doUnsubscribe(final long requestId) {
subscription.doUnsubscribe(this).thenAccept(v -> {
log.info("Unsubscribed successfully from {}", subscription);
cnx.removedConsumer(this);
cnx.getCommandSender().sendSuccess(requestId);
cnx.getCommandSender().sendSuccessResponse(requestId);
}).exceptionally(exception -> {
log.warn("Unsubscribe failed for {}", subscription, exception);
cnx.getCommandSender().sendError(requestId, BrokerServiceException.getClientErrorCode(exception),
cnx.getCommandSender().sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception),
exception.getCause().getMessage());
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boo

void sendActiveConsumerChange(long consumerId, boolean isActive);

void sendSuccess(long requestId);

void sendError(long requestId, ServerError error, String message);

void sendReachedEndOfTopic(long consumerId);

boolean sendTopicMigrated(ResourceType type, long resourceId, String brokerUrl, String brokerUrlTls);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,6 @@ public void sendActiveConsumerChange(long consumerId, boolean isActive) {
cnx.ctx().voidPromise());
}

@Override
public void sendSuccess(long requestId) {
cnx.ctx().writeAndFlush(Commands.newSuccess(requestId), cnx.ctx().voidPromise());
}

@Override
public void sendError(long requestId, ServerError error, String message) {
cnx.ctx().writeAndFlush(Commands.newError(requestId, error, message), cnx.ctx().voidPromise());
}

@Override
public void sendReachedEndOfTopic(long consumerId) {
// Only send notification if the client understand the command
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void handleWatchTopicListClose(CommandWatchTopicListClose commandWatchTop
long requestId = commandWatchTopicListClose.getRequestId();
long watcherId = commandWatchTopicListClose.getWatcherId();
deleteTopicListWatcher(watcherId);
connection.getCommandSender().sendSuccess(requestId);
connection.getCommandSender().sendSuccessResponse(requestId);
}

public void deleteTopicListWatcher(Long watcherId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ public int hashCode() {
@Override
public boolean equals(Object obj) {
if (obj instanceof TxnIDData other) {
return Objects.equals(mostSigBits, other.mostSigBits)
&& Objects.equals(leastSigBits, other.leastSigBits);
return mostSigBits == other.mostSigBits
&& leastSigBits == other.leastSigBits;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar;

import static org.apache.commons.io.FileUtils.cleanDirectory;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.eq;
Expand All @@ -26,6 +27,10 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.File;
import java.util.List;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamespaceName;
Expand All @@ -35,6 +40,7 @@
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = "broker")
Expand Down Expand Up @@ -91,4 +97,34 @@ public void testCreateNameSpace() throws Exception {
verify(admin.namespaces(), times(1)).createNamespace(eq(ns.toString()));
}

@Test(groups = "broker")
public void testStandaloneWithRocksDB() throws Exception {
String[] args = new String[]{"--config",
"./src/test/resources/configurations/pulsar_broker_test_standalone_with_rocksdb.conf"};
final int bookieNum = 3;
final File tempDir = IOUtils.createTempDir("standalone", "test");

PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
standalone.setBkDir(tempDir.getAbsolutePath());
standalone.setNumOfBk(bookieNum);

standalone.startBookieWithMetadataStore();
List<ServerConfiguration> firstBsConfs = standalone.bkCluster.getBsConfs();
Assert.assertEquals(firstBsConfs.size(), bookieNum);
standalone.close();

// start twice, read cookie from local folder
standalone.startBookieWithMetadataStore();
List<ServerConfiguration> secondBsConfs = standalone.bkCluster.getBsConfs();
Assert.assertEquals(secondBsConfs.size(), bookieNum);

for (int i = 0; i < bookieNum; i++) {
ServerConfiguration conf1 = firstBsConfs.get(i);
ServerConfiguration conf2 = secondBsConfs.get(i);
Assert.assertEquals(conf1.getBookiePort(), conf2.getBookiePort());
}
standalone.close();
cleanDirectory(tempDir);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

applicationName="pulsar_broker"
metadataStoreUrl=
configurationMetadataStoreUrl=
brokerServicePort=6650
brokerServicePortTls=6651
webServicePort=8080
allowLoopback=true
webServicePortTls=4443
bindAddress=0.0.0.0
advertisedAddress=
advertisedListeners=
internalListenerName=internal
clusterName="test_cluster"
brokerShutdownTimeoutMs=3000
backlogQuotaCheckEnabled=true
backlogQuotaCheckIntervalInSeconds=60
backlogQuotaDefaultLimitGB=50
brokerDeleteInactiveTopicsEnabled=true
brokerDeleteInactiveTopicsFrequencySeconds=60
allowAutoTopicCreation=true
allowAutoTopicCreationType=non-partitioned
defaultNumPartitions=1
messageExpiryCheckIntervalInMinutes=5
clientLibraryVersionCheckEnabled=false
clientLibraryVersionCheckAllowUnversioned=true
statusFilePath=/tmp/status.html
tlsEnabled=false
tlsCertificateFilePath=/usr/local/conf/pulsar/server.crt
tlsKeyFilePath=/home/local/conf/pulsar/server.key
tlsTrustCertsFilePath=
tlsAllowInsecureConnection=false
authenticationEnabled=false
authorizationEnabled=false
superUserRoles="test_user"
brokerClientAuthenticationPlugin="org.apache.pulsar.client.impl.auth.AuthenticationDisabled"
brokerClientAuthenticationParameters=
bookkeeperClientAuthenticationPlugin="test_auth_plugin"
bookkeeperClientAuthenticationAppId="test_auth_id"
bookkeeperClientTimeoutInSeconds=30
bookkeeperClientSpeculativeReadTimeoutInMillis=0
bookkeeperClientHealthCheckEnabled=true
bookkeeperClientHealthCheckIntervalSeconds=60
bookkeeperClientHealthCheckErrorThresholdPerInterval=5
bookkeeperClientHealthCheckQuarantineTimeInSeconds=1800
bookkeeperClientRackawarePolicyEnabled=true
bookkeeperClientRegionawarePolicyEnabled=false
bookkeeperClientMinNumRacksPerWriteQuorum=2
bookkeeperClientEnforceMinNumRacksPerWriteQuorum=false
bookkeeperClientReorderReadSequenceEnabled=false
bookkeeperClientIsolationGroups="test_group"
managedLedgerDefaultEnsembleSize=3
managedLedgerDefaultWriteQuorum=2
managedLedgerDefaultAckQuorum=2
managedLedgerCacheSizeMB=1024
managedLedgerCacheEvictionWatermark=10
managedLedgerDefaultMarkDeleteRateLimit=0.1
managedLedgerMaxEntriesPerLedger=50000
managedLedgerMinLedgerRolloverTimeMinutes=10
managedLedgerMaxLedgerRolloverTimeMinutes=240
managedLedgerCursorMaxEntriesPerLedger=50000
managedLedgerCursorRolloverTimeInSeconds = 14400
managedLedgerDataReadPriority = bookkeeper-first
loadBalancerEnabled = false
loadBalancerReportUpdateThresholdPercentage=10
loadBalancerReportUpdateMaxIntervalMinutes=15
loadBalancerHostUsageCheckIntervalMinutes=1
loadBalancerSheddingIntervalMinutes=30
loadBalancerSheddingGracePeriodMinutes=30
loadBalancerBrokerUnderloadedThresholdPercentage=50
loadBalancerBrokerOverloadedThresholdPercentage=85
replicationMetricsEnabled=true
replicationConnectionsPerBroker=16
replicationProducerQueueSize=1000
replicatorPrefix=pulsar.repl
brokerDeleteInactiveTopicsMode=delete_when_subscriptions_caught_up
supportedNamespaceBundleSplitAlgorithms=[range_equally_divide]
defaultNamespaceBundleSplitAlgorithm=topic_count_equally_divide
maxMessagePublishBufferSizeInMB=-1
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.Serializable;
import java.util.Objects;
import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;

Expand Down Expand Up @@ -48,22 +50,35 @@ public class TxnID implements Serializable {
*/
private final long leastSigBits;

@Getter(AccessLevel.NONE)
private final transient int hashCode;

@Getter(AccessLevel.NONE)
private final transient String txnStr;

public TxnID(long mostSigBits, long leastSigBits) {
this.mostSigBits = mostSigBits;
this.leastSigBits = leastSigBits;
this.hashCode = Objects.hash(mostSigBits, leastSigBits);
this.txnStr = "(" + mostSigBits + "," + leastSigBits + ")";
}

@Override
public String toString() {
return "(" + mostSigBits + "," + leastSigBits + ")";
return txnStr;
}

@Override
public int hashCode() {
return Objects.hash(mostSigBits, leastSigBits);
return hashCode;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof TxnID) {
TxnID other = (TxnID) obj;
return Objects.equals(mostSigBits, other.mostSigBits)
&& Objects.equals(leastSigBits, other.leastSigBits);
return mostSigBits == other.mostSigBits
&& leastSigBits == other.leastSigBits;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public class TransactionImpl implements Transaction , TimerTask {
private final long txnIdLeastBits;
private final long txnIdMostBits;

private final TxnID txnId;

private final Map<String, CompletableFuture<Void>> registerPartitionMap;
private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;
Expand Down Expand Up @@ -89,6 +91,7 @@ public void run(Timeout timeout) throws Exception {
this.transactionTimeoutMs = transactionTimeoutMs;
this.txnIdLeastBits = txnIdLeastBits;
this.txnIdMostBits = txnIdMostBits;
this.txnId = new TxnID(this.txnIdMostBits, this.txnIdLeastBits);

this.registerPartitionMap = new ConcurrentHashMap<>();
this.registerSubscriptionMap = new ConcurrentHashMap<>();
Expand All @@ -109,7 +112,7 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
} else {
return tcClient.addPublishPartitionToTxnAsync(
new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic))
txnId, Lists.newArrayList(topic))
.thenCompose(ignored -> CompletableFuture.completedFuture(null));
}
});
Expand Down Expand Up @@ -150,7 +153,7 @@ public CompletableFuture<Void> registerAckedTopic(String topic, String subscript
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
} else {
return tcClient.addSubscriptionToTxnAsync(
new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription)
txnId, topic, subscription)
.thenCompose(ignored -> CompletableFuture.completedFuture(null));
}
});
Expand Down Expand Up @@ -191,7 +194,7 @@ public CompletableFuture<Void> commit() {
abort().whenComplete((vx, ex) -> commitFuture.completeExceptionally(new PulsarClientException
.TransactionHasOperationFailedException()));
} else {
tcClient.commitAsync(new TxnID(txnIdMostBits, txnIdLeastBits))
tcClient.commitAsync(txnId)
.whenComplete((vx, ex) -> {
if (ex != null) {
if (ex instanceof TransactionNotFoundException
Expand All @@ -217,7 +220,7 @@ public CompletableFuture<Void> abort() {
CompletableFuture<Void> abortFuture = new CompletableFuture<>();
this.state = State.ABORTING;
opFuture.whenComplete((v, e) -> {
tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> {
tcClient.abortAsync(txnId).whenComplete((vx, ex) -> {

if (ex != null) {
if (ex instanceof TransactionNotFoundException
Expand All @@ -239,7 +242,7 @@ public CompletableFuture<Void> abort() {

@Override
public TxnID getTxnID() {
return new TxnID(txnIdMostBits, txnIdLeastBits);
return this.txnId;
}

@Override
Expand All @@ -253,7 +256,7 @@ public <T> boolean checkIfOpen(CompletableFuture<T> completableFuture) {
} else {
completableFuture
.completeExceptionally(new InvalidTxnStatusException(
new TxnID(txnIdMostBits, txnIdLeastBits).toString(), state.name(), State.OPEN.name()));
txnId.toString(), state.name(), State.OPEN.name()));
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class ExecutorProvider {

public static class ExtendedThreadFactory extends DefaultThreadFactory {
@Getter
private Thread thread;
private volatile Thread thread;
public ExtendedThreadFactory(String poolName) {
super(poolName, false);
}
Expand All @@ -52,9 +52,10 @@ public ExtendedThreadFactory(String poolName, boolean daemon) {

@Override
public Thread newThread(Runnable r) {
thread = super.newThread(r);
Thread thread = super.newThread(r);
thread.setUncaughtExceptionHandler((t, e) ->
log.error("Thread {} got uncaught Exception", t.getName(), e));
this.thread = thread;
return thread;
}
}
Expand Down
Loading

0 comments on commit 52b0b31

Please sign in to comment.