From da3e1037d8b8f1cc14df4e24d6d5d891a2dc713b Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Wed, 17 Aug 2022 22:22:26 +0800 Subject: [PATCH] polish replicator --- .../broker/service/AbstractReplicator.java | 55 ++-- .../NonPersistentReplicator.java | 34 ++- .../persistent/GeoPersistentReplicator.java | 175 ++++++++++++ .../persistent/PersistentReplicator.java | 257 +++++------------- .../service/persistent/PersistentTopic.java | 2 +- .../broker/service/PersistentTopicTest.java | 5 +- 6 files changed, 295 insertions(+), 233 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 18e2e2d16c37d..a3fe39b7438e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; @@ -39,11 +40,13 @@ public abstract class AbstractReplicator { protected final BrokerService brokerService; - protected final String topicName; + protected final String localTopicName; protected final String localCluster; + protected final String remoteTopicName; protected final String remoteCluster; protected final PulsarClientImpl replicationClient; protected final PulsarClientImpl client; + protected String replicatorId; protected volatile ProducerImpl producer; public static final String REPL_PRODUCER_NAME_DELIMITER = "-->"; @@ -64,30 +67,36 @@ protected enum State { Stopped, Starting, Started, Stopping } - public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster, - BrokerService brokerService, PulsarClientImpl replicationClient) + public AbstractReplicator(String localCluster, String localTopicName, String remoteCluster, String remoteTopicName, + String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { this.brokerService = brokerService; - this.topicName = topicName; + this.localTopicName = localTopicName; this.replicatorPrefix = replicatorPrefix; this.localCluster = localCluster.intern(); + this.remoteTopicName = remoteTopicName; this.remoteCluster = remoteCluster.intern(); this.replicationClient = replicationClient; this.client = (PulsarClientImpl) brokerService.pulsar().getClient(); this.producer = null; this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); - + this.replicatorId = String.format("%s | %s", + StringUtils.equals(localTopicName, remoteTopicName) ? localTopicName : + localTopicName + "-->" + remoteTopicName, + StringUtils.equals(localCluster, remoteCluster) ? localCluster : localCluster + "-->" + remoteCluster + ); this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) // - .topic(topicName) + .topic(remoteTopicName) .messageRoutingMode(MessageRoutingMode.SinglePartition) .enableBatching(false) .sendTimeout(0, TimeUnit.SECONDS) // .maxPendingMessages(producerQueueSize) // - .producerName(String.format("%s%s%s", getReplicatorName(replicatorPrefix, localCluster), - REPL_PRODUCER_NAME_DELIMITER, remoteCluster)); + .producerName(getProducerName()); STATE_UPDATER.set(this, State.Stopped); } + protected abstract String getProducerName(); + protected abstract void readEntries(org.apache.pulsar.client.api.Producer producer); protected abstract Position getReplicatorReadPosition(); @@ -107,8 +116,8 @@ public synchronized void startProducer() { long waitTimeMs = backOff.next(); if (log.isDebugEnabled()) { log.debug( - "[{}][{} -> {}] waiting for producer to close before attempting to reconnect, retrying in {} s", - topicName, localCluster, remoteCluster, waitTimeMs / 1000.0); + "[{}] waiting for producer to close before attempting to reconnect, retrying in {} s", + replicatorId, waitTimeMs / 1000.0); } // BackOff before retrying brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); @@ -119,30 +128,29 @@ public synchronized void startProducer() { if (state == State.Started) { // Already running if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Replicator was already running", topicName, localCluster, remoteCluster); + log.debug("[{}] Replicator was already running", replicatorId); } } else { - log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName, - localCluster, remoteCluster, state); + log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state); } return; } - log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster); + log.info("[{}] Starting replicator", replicatorId); producerBuilder.createAsync().thenAccept(producer -> { readEntries(producer); }).exceptionally(ex -> { if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { long waitTimeMs = backOff.next(); - log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, - localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); + log.warn("[{}] Failed to create remote producer ({}), retrying in {} s", + replicatorId, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); } else { - log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, - localCluster, remoteCluster, STATE_UPDATER.get(this), ex); + log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId, + STATE_UPDATER.get(this), ex); } return null; }); @@ -163,9 +171,9 @@ protected synchronized CompletableFuture closeProducerAsync() { }).exceptionally(ex -> { long waitTimeMs = backOff.next(); log.warn( - "[{}][{} -> {}] Exception: '{}' occurred while trying to close the producer." + "[{}] Exception: '{}' occurred while trying to close the producer." + " retrying again in {} s", - topicName, localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); + replicatorId, ex.getMessage(), waitTimeMs / 1000.0); // BackOff before retrying brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS); return null; @@ -183,8 +191,7 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) CompletableFuture disconnectFuture = new CompletableFuture<>(); disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog")); if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Replicator disconnect failed since topic has backlog", topicName, localCluster - , remoteCluster); + log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId); } return disconnectFuture; } @@ -198,8 +205,8 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)) { - log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster, - remoteCluster, getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); + log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId, + getReplicatorReadPosition(), getNumberOfEntriesInBacklog()); } return closeProducerAsync(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index b863e9eb3c2cd..961ae5ddc17b9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -50,7 +50,7 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster, BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { - super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, + super(localCluster, topic.getName(), remoteCluster, topic.getName(), topic.getReplicatorPrefix(), brokerService, replicationClient); producerBuilder.blockIfQueueFull(false); @@ -58,18 +58,26 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St startProducer(); } + /** + * @return Producer name format : replicatorPrefix.localCluster-->remoteCluster + */ + @Override + protected String getProducerName() { + return getReplicatorName(replicatorPrefix, localCluster) + REPL_PRODUCER_NAME_DELIMITER + remoteCluster; + } + @Override protected void readEntries(Producer producer) { this.producer = (ProducerImpl) producer; if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { - log.info("[{}][{} -> {}] Created replicator producer", topicName, localCluster, remoteCluster); + log.info("[{}] Created replicator producer", replicatorId); backOff.reset(); } else { log.info( - "[{}][{} -> {}] Replicator was stopped while creating the producer." + "[{}] Replicator was stopped while creating the producer." + " Closing it. Replicator state: {}", - topicName, localCluster, remoteCluster, STATE_UPDATER.get(this)); + replicatorId, STATE_UPDATER.get(this)); STATE_UPDATER.set(this, State.Stopping); closeProducerAsync(); return; @@ -85,8 +93,8 @@ public void sendMessage(Entry entry) { try { msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload); } catch (Throwable t) { - log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName, - localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t); + log.error("[{}] Failed to deserialize message at {} (buffer size: {}): {}", replicatorId, + entry.getPosition(), length, t.getMessage(), t); entry.release(); return; } @@ -100,8 +108,8 @@ public void sendMessage(Entry entry) { if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Skipping message at {} / msg-id: {}: replicateTo {}", topicName, - localCluster, remoteCluster, entry.getPosition(), msg.getMessageId(), msg.getReplicateTo()); + log.debug("[{}] Skipping message at {} / msg-id: {}: replicateTo {}", replicatorId, + entry.getPosition(), msg.getMessageId(), msg.getReplicateTo()); } entry.release(); msg.recycle(); @@ -118,8 +126,8 @@ public void sendMessage(Entry entry) { } else { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] dropping message because replicator producer is not started/writable", - topicName, localCluster, remoteCluster); + log.debug("[{}] dropping message because replicator producer is not started/writable", + replicatorId); } msgDrop.recordEvent(); entry.release(); @@ -167,12 +175,10 @@ private static final class ProducerSendCallback implements SendCallback { @Override public void sendComplete(Exception exception) { if (exception != null) { - log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName, - replicator.localCluster, replicator.remoteCluster, exception); + log.error("[{}] Error producing on remote broker", replicator.replicatorId, exception); } else { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Message persisted on remote broker", replicator.topicName, - replicator.localCluster, replicator.remoteCluster); + log.debug("[{}] Message persisted on remote broker", replicator.replicatorId); } } entry.release(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java new file mode 100644 index 0000000000000..be61f137d1635 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.broker.service.persistent; + +import io.netty.buffer.ByteBuf; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.schema.SchemaInfo; + +@Slf4j +public class GeoPersistentReplicator extends PersistentReplicator { + + public GeoPersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, + String remoteCluster, BrokerService brokerService, + PulsarClientImpl replicationClient) + throws PulsarServerException { + super(localCluster, topic, cursor, remoteCluster, topic.getName(), brokerService, replicationClient); + } + + /** + * @return Producer name format : replicatorPrefix.localCluster-->remoteCluster + */ + @Override + protected String getProducerName() { + return getReplicatorName(replicatorPrefix, localCluster) + REPL_PRODUCER_NAME_DELIMITER + remoteCluster; + } + + @Override + protected boolean replicateEntries(List entries) { + boolean atLeastOneMessageSentForReplication = false; + boolean isEnableReplicatedSubscriptions = + brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); + + try { + // This flag is set to true when we skip at least one local message, + // in order to skip remaining local messages. + boolean isLocalMessageSkippedOnce = false; + boolean skipRemainingMessages = false; + for (int i = 0; i < entries.size(); i++) { + Entry entry = entries.get(i); + // Skip the messages since the replicator need to fetch the schema info to replicate the schema to the + // remote cluster. Rewind the cursor first and continue the message read after fetched the schema. + if (skipRemainingMessages) { + entry.release(); + continue; + } + int length = entry.getLength(); + ByteBuf headersAndPayload = entry.getDataBuffer(); + MessageImpl msg; + try { + msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload); + } catch (Throwable t) { + log.error("[{}] Failed to deserialize message at {} (buffer size: {}): {}", replicatorId, + entry.getPosition(), length, t.getMessage(), t); + cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); + entry.release(); + continue; + } + + if (isEnableReplicatedSubscriptions) { + checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload); + } + + if (msg.isReplicated()) { + // Discard messages that were already replicated into this region + cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); + entry.release(); + msg.recycle(); + continue; + } + + if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) { + if (log.isDebugEnabled()) { + log.debug("[{}] Skipping message at position {}, replicateTo {}", replicatorId, + entry.getPosition(), msg.getReplicateTo()); + } + cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); + entry.release(); + msg.recycle(); + continue; + } + + if (msg.isExpired(messageTTLInSeconds)) { + msgExpired.recordEvent(0 /* no value stat */); + if (log.isDebugEnabled()) { + log.debug("[{}] Discarding expired message at position {}, replicateTo {}", + replicatorId, entry.getPosition(), msg.getReplicateTo()); + } + cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); + entry.release(); + msg.recycle(); + continue; + } + + if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { + // The producer is not ready yet after having stopped/restarted. Drop the message because it will + // recovered when the producer is ready + if (log.isDebugEnabled()) { + log.debug("[{}] Dropping read message at {} because producer is not ready", + replicatorId, entry.getPosition()); + } + isLocalMessageSkippedOnce = true; + entry.release(); + msg.recycle(); + continue; + } + + dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength())); + + // Increment pending messages for messages produced locally + PENDING_MESSAGES_UPDATER.incrementAndGet(this); + + msgOut.recordEvent(headersAndPayload.readableBytes()); + + msg.setReplicatedFrom(localCluster); + + headersAndPayload.retain(); + + CompletableFuture schemaFuture = getSchemaInfo(msg); + if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) { + entry.release(); + headersAndPayload.release(); + msg.recycle(); + // Mark the replicator is fetching the schema for now and rewind the cursor + // and trigger the next read after complete the schema fetching. + fetchSchemaInProgress = true; + skipRemainingMessages = true; + cursor.cancelPendingReadRequest(); + log.info("[{}] Pause the data replication due to new detected schema", replicatorId); + schemaFuture.whenComplete((__, e) -> { + if (e != null) { + log.warn("[{}] Failed to get schema from local cluster, will try in the next loop", + replicatorId, e); + } + log.info("[{}] Resume the data replication after the schema fetching done", replicatorId); + cursor.rewind(); + fetchSchemaInProgress = false; + readMoreEntries(); + }); + } else { + msg.setSchemaInfoForReplicator(schemaFuture.get()); + producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); + atLeastOneMessageSentForReplication = true; + } + } + } catch (Exception e) { + log.error("[{}] Unexpected exception: {}", replicatorId, e.getMessage(), e); + } + return atLeastOneMessageSentForReplication; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 2e7dbb2fbf19e..87f9ef5480cfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -64,13 +64,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PersistentReplicator extends AbstractReplicator +public abstract class PersistentReplicator extends AbstractReplicator implements Replicator, ReadEntriesCallback, DeleteCallback { private final PersistentTopic topic; protected final ManagedCursor cursor; - private Optional dispatchRateLimiter = Optional.empty(); + protected Optional dispatchRateLimiter = Optional.empty(); private final Object dispatchRateLimiterLock = new Object(); private int readBatchSize; @@ -78,7 +78,7 @@ public class PersistentReplicator extends AbstractReplicator private final int producerQueueThreshold; - private static final AtomicIntegerFieldUpdater PENDING_MESSAGES_UPDATER = + protected static final AtomicIntegerFieldUpdater PENDING_MESSAGES_UPDATER = AtomicIntegerFieldUpdater .newUpdater(PersistentReplicator.class, "pendingMessages"); private volatile int pendingMessages = 0; @@ -91,10 +91,10 @@ public class PersistentReplicator extends AbstractReplicator .newUpdater(PersistentReplicator.class, "havePendingRead"); private volatile int havePendingRead = FALSE; - private final Rate msgOut = new Rate(); - private final Rate msgExpired = new Rate(); + protected final Rate msgOut = new Rate(); + protected final Rate msgExpired = new Rate(); - private int messageTTLInSeconds = 0; + protected int messageTTLInSeconds = 0; private final Backoff readFailureBackoff = new Backoff(1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); @@ -105,24 +105,25 @@ public class PersistentReplicator extends AbstractReplicator private final ReplicatorStatsImpl stats = new ReplicatorStatsImpl(); - private volatile boolean fetchSchemaInProgress = false; + protected volatile boolean fetchSchemaInProgress = false; - public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String localCluster, String remoteCluster, - BrokerService brokerService, PulsarClientImpl replicationClient) + public PersistentReplicator(String localCluster, PersistentTopic localTopic, ManagedCursor cursor, + String remoteCluster, String remoteTopic, + BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException { - super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService, - replicationClient); - this.topic = topic; + super(localCluster, localTopic.getName(), remoteCluster, remoteTopic, localTopic.getReplicatorPrefix(), + brokerService, replicationClient); + this.topic = localTopic; this.cursor = cursor; - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, + this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName, Codec.decode(cursor.getName()), cursor, null); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); readBatchSize = Math.min( producerQueueSize, - topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); - readMaxSizeBytes = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes(); + localTopic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize()); + readMaxSizeBytes = localTopic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadSizeBytes(); producerQueueThreshold = (int) (producerQueueSize * 0.9); this.initializeDispatchRateLimiterIfNeeded(); @@ -140,7 +141,7 @@ protected void readEntries(org.apache.pulsar.client.api.Producer produce this.producer = (ProducerImpl) producer; if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) { - log.info("[{}][{} -> {}] Created replicator producer", topicName, localCluster, remoteCluster); + log.info("[{}] Created replicator producer", replicatorId); backOff.reset(); // activate cursor: so, entries can be cached this.cursor.setActive(); @@ -148,9 +149,9 @@ protected void readEntries(org.apache.pulsar.client.api.Producer produce readMoreEntries(); } else { log.info( - "[{}][{} -> {}] Replicator was stopped while creating the producer." + "[{}] Replicator was stopped while creating the producer." + " Closing it. Replicator state: {}", - topicName, localCluster, remoteCluster, STATE_UPDATER.get(this)); + replicatorId, STATE_UPDATER.get(this)); STATE_UPDATER.set(this, State.Stopping); closeProducerAsync(); } @@ -189,8 +190,8 @@ private int getAvailablePermits() { // return 0, if Producer queue is full, it will pause read entries. if (availablePermits <= 0) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Producer queue is full, availablePermits: {}, pause reading", - topicName, localCluster, remoteCluster, availablePermits); + log.debug("[{}] Producer queue is full, availablePermits: {}, pause reading", + replicatorId, availablePermits); } return 0; } @@ -201,9 +202,9 @@ private int getAvailablePermits() { // no permits from rate limit if (!rateLimiter.hasMessageDispatchPermit()) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] message-read exceeded topic replicator message-rate {}/{}," + log.debug("[{}] message-read exceeded topic replicator message-rate {}/{}," + " schedule after a {}", - topicName, localCluster, remoteCluster, + replicatorId, rateLimiter.getDispatchRateOnMsg(), rateLimiter.getDispatchRateOnByte(), MESSAGE_RATE_BACKOFF_MS); @@ -223,8 +224,7 @@ private int getAvailablePermits() { protected void readMoreEntries() { if (fetchSchemaInProgress) { - log.info("[{}][{} -> {}] Skip the reading due to new detected schema", - topicName, localCluster, remoteCluster); + log.info("[{}] Skip the reading due to new detected schema", replicatorId); return; } int availablePermits = getAvailablePermits(); @@ -233,8 +233,7 @@ protected void readMoreEntries() { int messagesToRead = Math.min(availablePermits, readBatchSize); if (!isWritable()) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Throttling replication traffic because producer is not writable", - topicName, localCluster, remoteCluster); + log.debug("[{}] Throttling replication traffic because producer is not writable", replicatorId); } // Minimize the read size if the producer is disconnected or the window is already full messagesToRead = 1; @@ -246,15 +245,14 @@ protected void readMoreEntries() { // Schedule read if (HAVE_PENDING_READ_UPDATER.compareAndSet(this, FALSE, TRUE)) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Schedule read of {} messages", topicName, localCluster, remoteCluster, - messagesToRead); + log.debug("[{}] Schedule read of {} messages", replicatorId, messagesToRead); } cursor.asyncReadEntriesOrWait(messagesToRead, readMaxSizeBytes, this, null, PositionImpl.LATEST); } else { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Not scheduling read due to pending read. Messages To Read {}", topicName, - localCluster, remoteCluster, messagesToRead); + log.debug("[{}] Not scheduling read due to pending read. Messages To Read {}", + replicatorId, messagesToRead); } } } else if (availablePermits == -1) { @@ -263,8 +261,8 @@ protected void readMoreEntries() { () -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); } else { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] No Permits for reading. availablePermits: {}", - topicName, localCluster, remoteCluster, availablePermits); + log.debug("[{}] No Permits for reading. availablePermits: {}", + replicatorId, availablePermits); } } } @@ -272,16 +270,15 @@ protected void readMoreEntries() { @Override public void readEntriesComplete(List entries, Object ctx) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Read entries complete of {} messages", topicName, localCluster, remoteCluster, - entries.size()); + log.debug("[{}] Read entries complete of {} messages", replicatorId, entries.size()); } int maxReadBatchSize = topic.getBrokerService().pulsar().getConfiguration().getDispatcherMaxReadBatchSize(); if (readBatchSize < maxReadBatchSize) { int newReadBatchSize = Math.min(readBatchSize * 2, maxReadBatchSize); if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Increasing read batch size from {} to {}", topicName, localCluster, - remoteCluster, readBatchSize, newReadBatchSize); + log.debug("[{}] Increasing read batch size from {} to {}", replicatorId, readBatchSize, + newReadBatchSize); } readBatchSize = newReadBatchSize; @@ -289,147 +286,28 @@ public void readEntriesComplete(List entries, Object ctx) { readFailureBackoff.reduceToHalf(); - boolean atLeastOneMessageSentForReplication = false; - boolean isEnableReplicatedSubscriptions = - brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); - - try { - // This flag is set to true when we skip atleast one local message, - // in order to skip remaining local messages. - boolean isLocalMessageSkippedOnce = false; - boolean skipRemainingMessages = false; - for (int i = 0; i < entries.size(); i++) { - Entry entry = entries.get(i); - // Skip the messages since the replicator need to fetch the schema info to replicate the schema to the - // remote cluster. Rewind the cursor first and continue the message read after fetched the schema. - if (skipRemainingMessages) { - entry.release(); - continue; - } - int length = entry.getLength(); - ByteBuf headersAndPayload = entry.getDataBuffer(); - MessageImpl msg; - try { - msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload); - } catch (Throwable t) { - log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName, - localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t); - cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); - entry.release(); - continue; - } - - if (isEnableReplicatedSubscriptions) { - checkReplicatedSubscriptionMarker(entry.getPosition(), msg, headersAndPayload); - } - - if (msg.isReplicated()) { - // Discard messages that were already replicated into this region - cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); - entry.release(); - msg.recycle(); - continue; - } - - if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) { - if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Skipping message at position {}, replicateTo {}", topicName, - localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo()); - } - cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); - entry.release(); - msg.recycle(); - continue; - } - - if (msg.isExpired(messageTTLInSeconds)) { - msgExpired.recordEvent(0 /* no value stat */); - if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Discarding expired message at position {}, replicateTo {}", topicName, - localCluster, remoteCluster, entry.getPosition(), msg.getReplicateTo()); - } - cursor.asyncDelete(entry.getPosition(), this, entry.getPosition()); - entry.release(); - msg.recycle(); - continue; - } - - if (STATE_UPDATER.get(this) != State.Started || isLocalMessageSkippedOnce) { - // The producer is not ready yet after having stopped/restarted. Drop the message because it will - // recovered when the producer is ready - if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Dropping read message at {} because producer is not ready", topicName, - localCluster, remoteCluster, entry.getPosition()); - } - isLocalMessageSkippedOnce = true; - entry.release(); - msg.recycle(); - continue; - } - - dispatchRateLimiter.ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(1, entry.getLength())); - - // Increment pending messages for messages produced locally - PENDING_MESSAGES_UPDATER.incrementAndGet(this); - - msgOut.recordEvent(headersAndPayload.readableBytes()); - - msg.setReplicatedFrom(localCluster); - - headersAndPayload.retain(); - - CompletableFuture schemaFuture = getSchemaInfo(msg); - if (!schemaFuture.isDone() || schemaFuture.isCompletedExceptionally()) { - entry.release(); - headersAndPayload.release(); - msg.recycle(); - // Mark the replicator is fetching the schema for now and rewind the cursor - // and trigger the next read after complete the schema fetching. - fetchSchemaInProgress = true; - skipRemainingMessages = true; - cursor.cancelPendingReadRequest(); - log.info("[{}][{} -> {}] Pause the data replication due to new detected schema", topicName, - localCluster, remoteCluster); - schemaFuture.whenComplete((__, e) -> { - if (e != null) { - log.warn("[{}][{} -> {}] Failed to get schema from local cluster, will try in the next loop", - topicName, localCluster, remoteCluster, e); - } - log.info("[{}][{} -> {}] Resume the data replication after the schema fetching done", topicName, - localCluster, remoteCluster); - cursor.rewind(); - fetchSchemaInProgress = false; - readMoreEntries(); - }); - } else { - msg.setSchemaInfoForReplicator(schemaFuture.get()); - producer.sendAsync(msg, ProducerSendCallback.create(this, entry, msg)); - atLeastOneMessageSentForReplication = true; - } - } - } catch (Exception e) { - log.error("[{}][{} -> {}] Unexpected exception: {}", topicName, localCluster, remoteCluster, e.getMessage(), - e); - } + boolean atLeastOneMessageSentForReplication = replicateEntries(entries); HAVE_PENDING_READ_UPDATER.set(this, FALSE); if (atLeastOneMessageSentForReplication && !isWritable()) { // Don't read any more entries until the current pending entries are persisted if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Pausing replication traffic. at-least-one: {} is-writable: {}", topicName, - localCluster, remoteCluster, atLeastOneMessageSentForReplication, isWritable()); + log.debug("[{}] Pausing replication traffic. at-least-one: {} is-writable: {}", replicatorId, + atLeastOneMessageSentForReplication, isWritable()); } } else { readMoreEntries(); } } - private CompletableFuture getSchemaInfo(MessageImpl msg) throws ExecutionException { + protected abstract boolean replicateEntries(List entries); + + protected CompletableFuture getSchemaInfo(MessageImpl msg) throws ExecutionException { if (msg.getSchemaVersion() == null || msg.getSchemaVersion().length == 0) { return CompletableFuture.completedFuture(null); } - return client.getSchemaProviderLoadingCache().get(topicName) + return client.getSchemaProviderLoadingCache().get(localTopicName) .getSchemaByVersion(msg.getSchemaVersion()); } @@ -443,7 +321,7 @@ public void updateCursorState() { } } - private static final class ProducerSendCallback implements SendCallback { + protected static final class ProducerSendCallback implements SendCallback { private PersistentReplicator replicator; private Entry entry; private MessageImpl msg; @@ -451,14 +329,12 @@ private static final class ProducerSendCallback implements SendCallback { @Override public void sendComplete(Exception exception) { if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) { - log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName, - replicator.localCluster, replicator.remoteCluster, exception); + log.error("[{}] Error producing on remote broker", replicator.replicatorId, exception); // cursor should be rewinded since it was incremented when readMoreEntries replicator.cursor.rewind(); } else { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Message persisted on remote broker", replicator.topicName, - replicator.localCluster, replicator.remoteCluster); + log.debug("[{}] Message persisted on remote broker", replicator.replicatorId, exception); } replicator.cursor.asyncDelete(entry.getPosition(), replicator, entry.getPosition()); } @@ -478,8 +354,8 @@ public void sendComplete(Exception exception) { replicator.readMoreEntries(); } else { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Not resuming reads. pending: {} is-writable: {}", - replicator.topicName, replicator.localCluster, replicator.remoteCluster, pending, + log.debug("[{}] Not resuming reads. pending: {} is-writable: {}", + replicator.replicatorId, pending, replicator.producer.isWritable()); } } @@ -543,9 +419,9 @@ public CompletableFuture getFuture() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { if (STATE_UPDATER.get(this) != State.Started) { - log.info("[{}][{} -> {}] Replicator was stopped while reading entries." + log.info("[{}] Replicator was stopped while reading entries." + " Stop reading. Replicator state: {}", - topic, localCluster, remoteCluster, STATE_UPDATER.get(this)); + replicatorId, STATE_UPDATER.get(this)); return; } @@ -555,21 +431,19 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { long waitTimeMillis = readFailureBackoff.next(); if (exception instanceof CursorAlreadyClosedException) { - log.error("[{}][{} -> {}] Error reading entries because replicator is" + log.error("[{}] Error reading entries because replicator is" + " already deleted and cursor is already closed {}, ({})", - topic, localCluster, - remoteCluster, ctx, exception.getMessage(), exception); + replicatorId, ctx, exception.getMessage(), exception); // replicator is already deleted and cursor is already closed so, producer should also be stopped closeProducerAsync(); return; } else if (!(exception instanceof TooManyRequestsException)) { - log.error("[{}][{} -> {}] Error reading entries at {}. Retrying to read in {}s. ({})", - topic, localCluster, - remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(), exception); + log.error("[{}] Error reading entries at {}. Retrying to read in {}s. ({})", + replicatorId, ctx, waitTimeMillis / 1000.0, exception.getMessage(), exception); } else { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})", - topicName, localCluster, remoteCluster, ctx, waitTimeMillis / 1000.0, exception.getMessage(), + log.debug("[{}] Throttled by bookies while reading at {}. Retrying to read in {}s. ({})", + replicatorId, ctx, waitTimeMillis / 1000.0, exception.getMessage(), exception); } } @@ -582,7 +456,7 @@ public CompletableFuture clearBacklog() { CompletableFuture future = new CompletableFuture<>(); if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Backlog size before clearing: {}", topicName, localCluster, remoteCluster, + log.debug("[{}] Backlog size before clearing: {}", replicatorId, cursor.getNumberOfEntriesInBacklog(false)); } @@ -590,7 +464,7 @@ public CompletableFuture clearBacklog() { @Override public void clearBacklogComplete(Object ctx) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Backlog size after clearing: {}", topicName, localCluster, remoteCluster, + log.debug("[{}] Backlog size after clearing: {}", replicatorId, cursor.getNumberOfEntriesInBacklog(false)); } future.complete(null); @@ -598,7 +472,7 @@ public void clearBacklogComplete(Object ctx) { @Override public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}][{} -> {}] Failed to clear backlog", topicName, localCluster, remoteCluster, exception); + log.error("[{}] Failed to clear backlog", replicatorId, exception); future.completeExceptionally(exception); } }, null); @@ -610,7 +484,7 @@ public CompletableFuture skipMessages(int numMessagesToSkip) { CompletableFuture future = new CompletableFuture<>(); if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Skipping {} messages, current backlog {}", topicName, localCluster, remoteCluster, + log.debug("[{}] Skipping {} messages, current backlog {}", replicatorId, numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); } cursor.asyncSkipEntries(numMessagesToSkip, IndividualDeletedEntries.Exclude, @@ -618,15 +492,15 @@ public CompletableFuture skipMessages(int numMessagesToSkip) { @Override public void skipEntriesComplete(Object ctx) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Skipped {} messages, new backlog {}", topicName, localCluster, - remoteCluster, numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); + log.debug("[{}] Skipped {} messages, new backlog {}", replicatorId, + numMessagesToSkip, cursor.getNumberOfEntriesInBacklog(false)); } future.complete(null); } @Override public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}][{} -> {}] Failed to skip {} messages", topicName, localCluster, remoteCluster, + log.error("[{}] Failed to skip {} messages", replicatorId, numMessagesToSkip, exception); future.completeExceptionally(exception); } @@ -639,7 +513,7 @@ public CompletableFuture peekNthMessage(int messagePosition) { CompletableFuture future = new CompletableFuture<>(); if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Getting message at position {}", topicName, localCluster, remoteCluster, + log.debug("[{}] Getting message at position {}", replicatorId, messagePosition); } @@ -662,13 +536,13 @@ public void readEntryComplete(Entry entry, Object ctx) { @Override public void deleteComplete(Object ctx) { if (log.isDebugEnabled()) { - log.debug("[{}][{} -> {}] Deleted message at {}", topicName, localCluster, remoteCluster, ctx); + log.debug("[{}] Deleted message at {}", replicatorId, ctx); } } @Override public void deleteFailed(ManagedLedgerException exception, Object ctx) { - log.error("[{}][{} -> {}] Failed to delete message at {}: {}", topicName, localCluster, remoteCluster, ctx, + log.error("[{}] Failed to delete message at {}: {}", replicatorId, ctx, exception.getMessage(), exception); if (ctx instanceof PositionImpl) { PositionImpl deletedEntry = (PositionImpl) ctx; @@ -752,7 +626,7 @@ public void updateRateLimiter() { dispatchRateLimiter.ifPresent(DispatchRateLimiter::updateDispatchRate); } - private void checkReplicatedSubscriptionMarker(Position position, MessageImpl msg, ByteBuf payload) { + protected void checkReplicatedSubscriptionMarker(Position position, MessageImpl msg, ByteBuf payload) { if (!msg.getMessageBuilder().hasMarkerType()) { // No marker is defined return; @@ -796,8 +670,7 @@ public synchronized CompletableFuture disconnect(boolean failIfHasBacklog) }).exceptionally(ex -> { Throwable t = (ex instanceof CompletionException ? ex.getCause() : ex); if (!(t instanceof TopicBusyException)) { - log.error("[{}][{} -> {}] Failed to close dispatch rate limiter: {}", topicName, localCluster, - remoteCluster, ex.getMessage()); + log.error("[{}] Failed to close dispatch rate limiter: {}", replicatorId, ex.getMessage()); } future.completeExceptionally(t); return null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 87a122d3c7ae4..1ae0ac4a255fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1522,7 +1522,7 @@ protected CompletableFuture addReplicationCluster(String remoteCluster, Ma .thenAccept(replicationClient -> { Replicator replicator = replicators.computeIfAbsent(remoteCluster, r -> { try { - return new PersistentReplicator(PersistentTopic.this, cursor, localCluster, + return new GeoPersistentReplicator(PersistentTopic.this, cursor, localCluster, remoteCluster, brokerService, (PulsarClientImpl) replicationClient); } catch (PulsarServerException e) { log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 864a4e52ee28c..1404ce7c7988e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -106,6 +106,7 @@ import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.broker.resources.TopicResources; import org.apache.pulsar.broker.service.persistent.CompactorSubscription; +import org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; @@ -1793,7 +1794,7 @@ public void testAtomicReplicationRemoval() throws Exception { doReturn(remoteCluster).when(cursor).getName(); brokerService.getReplicationClients().put(remoteCluster, client); PersistentReplicator replicator = spy( - new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService, + new GeoPersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService, (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster, brokerService.pulsar().getPulsarResources().getClusterResources() .getCluster(remoteCluster)))); @@ -1841,7 +1842,7 @@ public void testClosingReplicationProducerTwice() throws Exception { ManagedCursor cursor = mock(ManagedCursorImpl.class); doReturn(remoteCluster).when(cursor).getName(); brokerService.getReplicationClients().put(remoteCluster, client); - PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, + PersistentReplicator replicator = new GeoPersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService, (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster, brokerService.pulsar().getPulsarResources().getClusterResources() .getCluster(remoteCluster)));