From bfd73f3a6c8e786627b3aa2ceb4be3d3cf1e130b Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Thu, 18 Aug 2022 15:19:50 +0800 Subject: [PATCH] fix compile --- .../apache/pulsar/broker/service/AbstractReplicator.java | 6 +++--- .../broker/service/persistent/GeoPersistentReplicator.java | 5 +++-- .../broker/service/persistent/PersistentReplicator.java | 3 ++- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 89237e2e3cf99c..a3fe39b7438e70 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -81,8 +81,8 @@ public AbstractReplicator(String localCluster, String localTopicName, String rem this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); this.replicatorId = String.format("%s | %s", - StringUtils.equals(localTopicName, remoteTopicName) ? - localTopicName : localTopicName + "-->" + remoteTopicName, + StringUtils.equals(localTopicName, remoteTopicName) ? localTopicName : + localTopicName + "-->" + remoteTopicName, StringUtils.equals(localCluster, remoteCluster) ? localCluster : localCluster + "-->" + remoteCluster ); this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) // @@ -95,7 +95,7 @@ public AbstractReplicator(String localCluster, String localTopicName, String rem STATE_UPDATER.set(this, State.Stopped); } - abstract protected String getProducerName(); + protected abstract String getProducerName(); protected abstract void readEntries(org.apache.pulsar.client.api.Producer producer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java index ac34e1c1c5862b..e0344976b0d129 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -31,8 +31,9 @@ @Slf4j public class GeoPersistentReplicator extends PersistentReplicator { - public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, - BrokerService brokerService, PulsarClientImpl replicationClient) + public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, + String remoteCluster, BrokerService brokerService, + PulsarClientImpl replicationClient) throws PulsarServerException { super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index c8140a79d8ba6d..952f24a7f3c95c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -271,7 +271,8 @@ public void readEntriesComplete(List entries, Object ctx) { if (readBatchSize < maxReadBatchSize) { int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize); if (log.isDebugEnabled()) { - log.debug("[{}] Increasing read batch size from {} to {}", replicatorId, readBatchSize, newReadBatchSize); + log.debug("[{}] Increasing read batch size from {} to {}", replicatorId, readBatchSize, + newReadBatchSize); } readBatchSize = newReadBatchSize;