Skip to content

Commit

Permalink
[feat][broker]PIP-180: ShadowTopic - Part I - Refactor replicator for…
Browse files Browse the repository at this point in the history
… ShadowReplicator (#17150)
  • Loading branch information
Jason918 authored Aug 22, 2022
1 parent 3fb4b29 commit 43759d2
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
Expand All @@ -39,11 +40,13 @@
public abstract class AbstractReplicator {

protected final BrokerService brokerService;
protected final String topicName;
protected final String localTopicName;
protected final String localCluster;
protected final String remoteTopicName;
protected final String remoteCluster;
protected final PulsarClientImpl replicationClient;
protected final PulsarClientImpl client;
protected String replicatorId;

protected volatile ProducerImpl producer;
public static final String REPL_PRODUCER_NAME_DELIMITER = "-->";
Expand All @@ -64,30 +67,36 @@ protected enum State {
Stopped, Starting, Started, Stopping
}

public AbstractReplicator(String topicName, String replicatorPrefix, String localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl replicationClient)
public AbstractReplicator(String localCluster, String localTopicName, String remoteCluster, String remoteTopicName,
String replicatorPrefix, BrokerService brokerService, PulsarClientImpl replicationClient)
throws PulsarServerException {
this.brokerService = brokerService;
this.topicName = topicName;
this.localTopicName = localTopicName;
this.replicatorPrefix = replicatorPrefix;
this.localCluster = localCluster.intern();
this.remoteTopicName = remoteTopicName;
this.remoteCluster = remoteCluster.intern();
this.replicationClient = replicationClient;
this.client = (PulsarClientImpl) brokerService.pulsar().getClient();
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.replicatorId = String.format("%s | %s",
StringUtils.equals(localTopicName, remoteTopicName) ? localTopicName :
localTopicName + "-->" + remoteTopicName,
StringUtils.equals(localCluster, remoteCluster) ? localCluster : localCluster + "-->" + remoteCluster
);
this.producerBuilder = replicationClient.newProducer(Schema.AUTO_PRODUCE_BYTES()) //
.topic(topicName)
.topic(remoteTopicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) //
.producerName(String.format("%s%s%s", getReplicatorName(replicatorPrefix, localCluster),
REPL_PRODUCER_NAME_DELIMITER, remoteCluster));
.producerName(getProducerName());
STATE_UPDATER.set(this, State.Stopped);
}

protected abstract String getProducerName();

protected abstract void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);

protected abstract Position getReplicatorReadPosition();
Expand All @@ -107,8 +116,8 @@ public synchronized void startProducer() {
long waitTimeMs = backOff.next();
if (log.isDebugEnabled()) {
log.debug(
"[{}][{} -> {}] waiting for producer to close before attempting to reconnect, retrying in {} s",
topicName, localCluster, remoteCluster, waitTimeMs / 1000.0);
"[{}] waiting for producer to close before attempting to reconnect, retrying in {} s",
replicatorId, waitTimeMs / 1000.0);
}
// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
Expand All @@ -119,30 +128,29 @@ public synchronized void startProducer() {
if (state == State.Started) {
// Already running
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Replicator was already running", topicName, localCluster, remoteCluster);
log.debug("[{}] Replicator was already running", replicatorId);
}
} else {
log.info("[{}][{} -> {}] Replicator already being started. Replicator state: {}", topicName,
localCluster, remoteCluster, state);
log.info("[{}] Replicator already being started. Replicator state: {}", replicatorId, state);
}

return;
}

log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
log.info("[{}] Starting replicator", replicatorId);
producerBuilder.createAsync().thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
log.warn("[{}] Failed to create remote producer ({}), retrying in {} s",
replicatorId, ex.getMessage(), waitTimeMs / 1000.0);

// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
log.warn("[{}] Failed to create remote producer. Replicator state: {}", replicatorId,
STATE_UPDATER.get(this), ex);
}
return null;
});
Expand All @@ -163,9 +171,9 @@ protected synchronized CompletableFuture<Void> closeProducerAsync() {
}).exceptionally(ex -> {
long waitTimeMs = backOff.next();
log.warn(
"[{}][{} -> {}] Exception: '{}' occurred while trying to close the producer."
"[{}] Exception: '{}' occurred while trying to close the producer."
+ " retrying again in {} s",
topicName, localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);
replicatorId, ex.getMessage(), waitTimeMs / 1000.0);
// BackOff before retrying
brokerService.executor().schedule(this::closeProducerAsync, waitTimeMs, TimeUnit.MILLISECONDS);
return null;
Expand All @@ -183,8 +191,7 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)
CompletableFuture<Void> disconnectFuture = new CompletableFuture<>();
disconnectFuture.completeExceptionally(new TopicBusyException("Cannot close a replicator with backlog"));
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Replicator disconnect failed since topic has backlog", topicName, localCluster
, remoteCluster);
log.debug("[{}] Replicator disconnect failed since topic has backlog", replicatorId);
}
return disconnectFuture;
}
Expand All @@ -198,8 +205,8 @@ public synchronized CompletableFuture<Void> disconnect(boolean failIfHasBacklog)

if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping)
|| STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping)) {
log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster,
remoteCluster, getReplicatorReadPosition(), getNumberOfEntriesInBacklog());
log.info("[{}] Disconnect replicator at position {} with backlog {}", replicatorId,
getReplicatorReadPosition(), getNumberOfEntriesInBacklog());
}

return closeProducerAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,34 @@ public class NonPersistentReplicator extends AbstractReplicator implements Repli

public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, String remoteCluster,
BrokerService brokerService, PulsarClientImpl replicationClient) throws PulsarServerException {
super(topic.getName(), topic.getReplicatorPrefix(), localCluster, remoteCluster, brokerService,
super(localCluster, topic.getName(), remoteCluster, topic.getName(), topic.getReplicatorPrefix(), brokerService,
replicationClient);

producerBuilder.blockIfQueueFull(false);

startProducer();
}

/**
* @return Producer name format : replicatorPrefix.localCluster-->remoteCluster
*/
@Override
protected String getProducerName() {
return getReplicatorName(replicatorPrefix, localCluster) + REPL_PRODUCER_NAME_DELIMITER + remoteCluster;
}

@Override
protected void readEntries(Producer<byte[]> producer) {
this.producer = (ProducerImpl) producer;

if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
log.info("[{}][{} -> {}] Created replicator producer", topicName, localCluster, remoteCluster);
log.info("[{}] Created replicator producer", replicatorId);
backOff.reset();
} else {
log.info(
"[{}][{} -> {}] Replicator was stopped while creating the producer."
"[{}] Replicator was stopped while creating the producer."
+ " Closing it. Replicator state: {}",
topicName, localCluster, remoteCluster, STATE_UPDATER.get(this));
replicatorId, STATE_UPDATER.get(this));
STATE_UPDATER.set(this, State.Stopping);
closeProducerAsync();
return;
Expand All @@ -85,8 +93,8 @@ public void sendMessage(Entry entry) {
try {
msg = MessageImpl.deserializeSkipBrokerEntryMetaData(headersAndPayload);
} catch (Throwable t) {
log.error("[{}][{} -> {}] Failed to deserialize message at {} (buffer size: {}): {}", topicName,
localCluster, remoteCluster, entry.getPosition(), length, t.getMessage(), t);
log.error("[{}] Failed to deserialize message at {} (buffer size: {}): {}", replicatorId,
entry.getPosition(), length, t.getMessage(), t);
entry.release();
return;
}
Expand All @@ -100,8 +108,8 @@ public void sendMessage(Entry entry) {

if (msg.hasReplicateTo() && !msg.getReplicateTo().contains(remoteCluster)) {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Skipping message at {} / msg-id: {}: replicateTo {}", topicName,
localCluster, remoteCluster, entry.getPosition(), msg.getMessageId(), msg.getReplicateTo());
log.debug("[{}] Skipping message at {} / msg-id: {}: replicateTo {}", replicatorId,
entry.getPosition(), msg.getMessageId(), msg.getReplicateTo());
}
entry.release();
msg.recycle();
Expand All @@ -118,8 +126,8 @@ public void sendMessage(Entry entry) {

} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] dropping message because replicator producer is not started/writable",
topicName, localCluster, remoteCluster);
log.debug("[{}] dropping message because replicator producer is not started/writable",
replicatorId);
}
msgDrop.recordEvent();
entry.release();
Expand Down Expand Up @@ -167,12 +175,10 @@ private static final class ProducerSendCallback implements SendCallback {
@Override
public void sendComplete(Exception exception) {
if (exception != null) {
log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster, exception);
log.error("[{}] Error producing on remote broker", replicator.replicatorId, exception);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{} -> {}] Message persisted on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster);
log.debug("[{}] Message persisted on remote broker", replicator.replicatorId);
}
}
entry.release();
Expand Down
Loading

0 comments on commit 43759d2

Please sign in to comment.