Skip to content

Commit

Permalink
fix compile
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 committed Aug 18, 2022
1 parent fd7198b commit bfd73f3
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public AbstractReplicator(String localCluster, String localTopicName, String rem
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
this.replicatorId = String.format("%s | %s",
StringUtils.equals(localTopicName, remoteTopicName) ?
localTopicName : localTopicName + "-->" + remoteTopicName,
StringUtils.equals(localTopicName, remoteTopicName) ? localTopicName :
localTopicName + "-->" + remoteTopicName,
StringUtils.equals(localCluster, remoteCluster) ? localCluster : localCluster + "-->" + remoteCluster
);
this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
Expand All @@ -95,7 +95,7 @@ public AbstractReplicator(String localCluster, String localTopicName, String rem
STATE_UPDATER.set(this, State.Stopped);
}

abstract protected String getProducerName();
protected abstract String getProducerName();

protected abstract void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
@Slf4j
public class GeoPersistentReplicator extends PersistentReplicator {

public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl replicationClient)
public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster,
String remoteCluster, BrokerService brokerService,
PulsarClientImpl replicationClient)
throws PulsarServerException {
super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
if (readBatchSize < maxReadBatchSize) {
int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize);
if (log.isDebugEnabled()) {
log.debug("[{}] Increasing read batch size from {} to {}", replicatorId, readBatchSize, newReadBatchSize);
log.debug("[{}] Increasing read batch size from {} to {}", replicatorId, readBatchSize,
newReadBatchSize);
}

readBatchSize = newReadBatchSize;
Expand Down

0 comments on commit bfd73f3

Please sign in to comment.