Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix avoid creating new topic after migration is started #21368

Merged
merged 2 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,11 @@ public Optional<ClusterUrl> getMigratedClusterUrl() {
return getMigratedClusterUrl(brokerService.getPulsar(), topic);
}

public static CompletableFuture<Boolean> isClusterMigrationEnabled(PulsarService pulsar,
String topic) {
return getMigratedClusterUrlAsync(pulsar, topic).thenApply(url -> url.isPresent());
}

public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar,
String topic) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
Expand Down Expand Up @@ -1521,6 +1522,12 @@ protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final S
topicFuture.handle((persistentTopic, ex) -> {
// release permit and process pending topic
topicLoadSemaphore.release();
// do not recreate topic if topic is already migrated and deleted by broker
// so, avoid creating a new topic if migration is already started
if (ex != null && (ex.getCause() instanceof TopicMigratedException)) {
topicFuture.completeExceptionally(ex.getCause());
return null;
}
createPendingLoadTopic();
return null;
});
Expand Down Expand Up @@ -1632,7 +1639,10 @@ private void createPersistentTopic(final String topic, boolean createIfMissing,
? checkMaxTopicsPerNamespace(topicName, 1)
: CompletableFuture.completedFuture(null);

maxTopicsCheck.thenCompose(__ -> getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> {
CompletableFuture<Void> isTopicAlreadyMigrated = checkTopicAlreadyMigrated(topicName);

maxTopicsCheck.thenCompose(__ -> isTopicAlreadyMigrated).thenCompose(__ -> getManagedLedgerConfig(topicName))
.thenAccept(managedLedgerConfig -> {
if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) {
// init managedLedger interceptor
Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>();
Expand Down Expand Up @@ -1760,6 +1770,20 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
});
}

private CompletableFuture<Void> checkTopicAlreadyMigrated(TopicName topicName) {
CompletableFuture<Void> result = new CompletableFuture<>();
AbstractTopic.isClusterMigrationEnabled(pulsar, topicName.toString()).handle((isMigrated, ex) -> {
if (isMigrated) {
result.completeExceptionally(
new BrokerServiceException.TopicMigratedException(topicName + " already migrated"));
} else {
result.complete(null);
}
return null;
});
return result;
}

public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull TopicName topicName) {
requireNonNull(topicName);
NamespaceName namespace = topicName.getNamespaceObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,24 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar(),
topicName.toString());
if (clusterURL.isPresent()) {
log.info("[{}] redirect migrated consumer to topic {}: "
+ "consumerId={}, subName={}, {}", remoteAddress,
topicName, consumerId, subscriptionName, exception.getCause().getMessage());
boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Consumer, consumerId,
clusterURL.get().getBrokerServiceUrl(),
clusterURL.get().getBrokerServiceUrlTls());
if (!msgSent) {
log.info("consumer client doesn't support topic migration handling {}-{}-{}",
topicName, remoteAddress, consumerId);
}
consumers.remove(consumerId, consumerFuture);
closeConsumer(consumerId);
return null;
}
} else if (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}",
remoteAddress, topicName, subscriptionName,
Expand Down Expand Up @@ -1567,6 +1585,22 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}
producers.remove(producerId, producerFuture);
return null;
} else if (cause instanceof BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar(), topicName.toString());
if (clusterURL.isPresent()) {
log.info("[{}] redirect migrated producer to topic {}: "
+ "producerId={}, producerName = {}, {}", remoteAddress,
topicName, producerId, producerName, cause.getMessage());
boolean msgSent = commandSender.sendTopicMigrated(ResourceType.Producer, producerId,
clusterURL.get().getBrokerServiceUrl(), clusterURL.get().getBrokerServiceUrlTls());
if (!msgSent) {
log.info("client doesn't support topic migration handling {}-{}-{}", topicName,
remoteAddress, producerId);
}
producers.remove(producerId, producerFuture);
closeProducer(producerId, -1L);
return null;
}
}

// Do not print stack traces for expected exceptions
Expand Down Expand Up @@ -2986,15 +3020,18 @@ protected void interceptCommand(BaseCommand command) throws InterceptException {
public void closeProducer(Producer producer) {
// removes producer-connection from map and send close command to producer
safelyRemoveProducer(producer);
closeProducer(producer.getProducerId(), producer.getEpoch());

}

public void closeProducer(long producerId, long epoch) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
writeAndFlush(Commands.newCloseProducer(producerId, -1L));
// The client does not necessarily know that the producer is closed, but the connection is still
// active, and there could be messages in flight already. We want to ignore these messages for a time
// because they are expected. Once the interval has passed, the client should have received the
// CloseProducer command and should not send any additional messages until it sends a create Producer
// command.
final long epoch = producer.getEpoch();
final long producerId = producer.getProducerId();
recentlyClosedProducers.put(producerId, epoch);
ctx.executor().schedule(() -> {
recentlyClosedProducers.remove(producerId, epoch);
Expand All @@ -3009,8 +3046,12 @@ public void closeProducer(Producer producer) {
public void closeConsumer(Consumer consumer) {
// removes consumer-connection from map and send close command to consumer
safelyRemoveConsumer(consumer);
closeConsumer(consumer.consumerId());
}

public void closeConsumer(long consumerId) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
} else {
close();
}
Expand Down Expand Up @@ -3529,4 +3570,4 @@ protected AuthenticationState getOriginalAuthState() {
protected void setAuthRole(String authRole) {
this.authRole = authRole;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,8 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t
String newTopicName = topicName + "-new";
consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(subType)
.subscriptionName("sM").subscribe();
retryStrategically((t) -> pulsar1.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
pulsar1.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
retryStrategically((t) -> pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
pulsar2.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
retryStrategically((t) ->
pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent() &&
pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM")
Expand Down Expand Up @@ -398,7 +398,7 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t
.subscriptionName("s1-d").subscribe();
Producer<byte[]> producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false)
.producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topicDiff = (AbstractTopic) pulsar1.getBrokerService().getTopic(diffTopic, false).getNow(null).get();
AbstractTopic topicDiff = (AbstractTopic) pulsar2.getBrokerService().getTopic(diffTopic, false).getNow(null).get();
assertNotNull(topicDiff);
for (int i = 0; i < n; i++) {
producerDiff.send("diff".getBytes());
Expand Down Expand Up @@ -603,6 +603,39 @@ public void testClusterMigrationWithResourceCreated() throws Exception {

consumer1.close();
producer1.close();

// publish to new topic which should be redirected immediately
String newTopic = topicName+"-new";
producer1 = client1.newProducer().topic(newTopic).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
retryStrategically((test) -> {
try {
pulsar2.getBrokerService().getTopic(newTopic, false).getNow(null).get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);
PersistentTopic pulsar2Topic = (PersistentTopic) pulsar2.getBrokerService().getTopic(newTopic, false).getNow(null)
.get();
retryStrategically((test) -> {
try {
return !pulsar2Topic.getProducers().isEmpty();
} catch (Exception e) {
return false;
}
}, 10, 500);
assertFalse(pulsar2Topic.getProducers().isEmpty());
consumer1 = client1.newConsumer().topic(newTopic).subscriptionName("s1").subscribe();
retryStrategically((test) -> {
try {
return !pulsar2Topic.getSubscription("s1").getConsumers().isEmpty();
} catch (Exception e) {
return false;
}
}, 10, 500);
assertFalse(pulsar2Topic.getSubscription("s1").getConsumers().isEmpty());
}

@Test(dataProvider = "NamespaceMigrationTopicSubscriptionTypes")
Expand Down Expand Up @@ -803,7 +836,7 @@ public void testNamespaceMigration(boolean persistent, SubscriptionType subType,
consumerM = client1.newConsumer().topic(newTopicName).subscriptionType(subType)
.subscriptionName("sM").subscribe();
retryStrategically((t) -> pulsar1.getBrokerService().getTopicReference(newTopicName).isPresent(), 5, 100);
pulsar1.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
pulsar2.getBrokerService().getTopicReference(newTopicName).get().checkClusterMigration().get();
retryStrategically((t) ->
pulsar2.getBrokerService().getTopicReference(newTopicName).isPresent() &&
pulsar2.getBrokerService().getTopicReference(newTopicName).get().getSubscription("sM")
Expand Down Expand Up @@ -832,7 +865,7 @@ public void testNamespaceMigration(boolean persistent, SubscriptionType subType,
.subscriptionName("s1-d").subscribe();
Producer<byte[]> producerDiff = client1.newProducer().topic(diffTopic).enableBatching(false)
.producerName("cluster1-d").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
AbstractTopic topicDiff = (AbstractTopic) pulsar1.getBrokerService().getTopic(diffTopic, false).getNow(null).get();
AbstractTopic topicDiff = (AbstractTopic) pulsar2.getBrokerService().getTopic(diffTopic, false).getNow(null).get();
assertNotNull(topicDiff);
for (int i = 0; i < n; i++) {
producerDiff.send("diff".getBytes());
Expand Down
Loading