Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker]Support to migrate topics from blue to green cluster per namespace #21367

Merged
merged 1 commit into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2642,4 +2642,18 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
return null;
});
}

protected void internalEnableMigration(boolean migrated) {
validateSuperUserAccess();
try {
updatePolicies(namespaceName, policies -> {
policies.isMigrated = migrated;
return policies;
});
log.info("Successfully updated migration on namespace {}", namespaceName);
} catch (Exception e) {
log.error("Failed to update migration on namespace {}", namespaceName, e);
throw new RestException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1709,5 +1709,18 @@ public void setSchemaAutoUpdateCompatibilityStrategy(@PathParam("tenant") String
internalSetSchemaAutoUpdateCompatibilityStrategy(strategy);
}

@POST
@Path("/{property}/{cluster}/{namespace}/migration")
@ApiOperation(hidden = true, value = "Update migration for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void enableMigration(@PathParam("property") String property,
@PathParam("cluster") String cluster,
@PathParam("namespace") String namespace,
boolean migrated) {
validateNamespaceName(property, cluster, namespace);
internalEnableMigration(migrated);
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,17 @@ public void removeNamespaceEntryFilters(@Suspended AsyncResponse asyncResponse,
});
}


@POST
@Path("/{tenant}/{namespace}/migration")
@ApiOperation(hidden = true, value = "Update migration for all topics in a namespace")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Property or cluster or namespace doesn't exist") })
public void enableMigration(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
boolean migrated) {
validateNamespaceName(tenant, namespace);
internalEnableMigration(migrated);
}

private static final Logger log = LoggerFactory.getLogger(Namespaces.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1349,19 +1349,29 @@ public void updateBrokerSubscribeRate() {
}

public Optional<ClusterUrl> getMigratedClusterUrl() {
return getMigratedClusterUrl(brokerService.getPulsar());
return getMigratedClusterUrl(brokerService.getPulsar(), topic);
}

public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar) {
public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync(PulsarService pulsar,
String topic) {
return pulsar.getPulsarResources().getClusterResources().getClusterAsync(pulsar.getConfig().getClusterName())
.thenApply(clusterData -> (clusterData.isPresent() && clusterData.get().isMigrated())
.thenCombine(isNamespaceMigrationEnabledAsync(pulsar, topic),
((clusterData, isNamespaceMigrationEnabled)
-> ((clusterData.isPresent() && clusterData.get().isMigrated())
|| isNamespaceMigrationEnabled)
? Optional.ofNullable(clusterData.get().getMigratedClusterUrl())
: Optional.empty());
: Optional.empty()));
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar) {
private static CompletableFuture<Boolean> isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
return pulsar.getPulsarResources().getNamespaceResources().
getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(policies -> policies.isPresent() && policies.get().isMigrated);
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar, String topic) {
try {
return getMigratedClusterUrlAsync(pulsar)
return getMigratedClusterUrlAsync(pulsar, topic)
.get(pulsar.getPulsarResources().getClusterResources().getOperationTimeoutSec(), TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Failed to get migration cluster URL", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,8 @@ public void topicMigrated(Optional<ClusterUrl> clusterUrl) {

public boolean checkAndApplyTopicMigration() {
if (subscription.isSubsciptionMigrated()) {
Optional<ClusterUrl> clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar());
Optional<ClusterUrl> clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar(),
topicName);
if (clusterUrl.isPresent()) {
ClusterUrl url = clusterUrl.get();
cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1644,7 +1644,7 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ
producers.remove(producerId, producerFuture);
}).exceptionallyAsync(ex -> {
if (ex.getCause() instanceof BrokerServiceException.TopicMigratedException) {
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar());
Optional<ClusterUrl> clusterURL = getMigratedClusterUrl(service.getPulsar(), topic.getName());
if (clusterURL.isPresent()) {
if (topic.isReplicationBacklogExist()) {
log.info("Topic {} is migrated but replication backlog exist: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,8 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
}

private CompletableFuture<Void> updateClusterMigrated() {
return getMigratedClusterUrlAsync(brokerService.getPulsar()).thenAccept(url -> migrated = url.isPresent());
}

private Optional<ClusterUrl> getClusterMigrationUrl() {
return getMigratedClusterUrl(brokerService.getPulsar());
return getMigratedClusterUrlAsync(brokerService.getPulsar(), topic)
.thenAccept(url -> migrated = url.isPresent());
}

public CompletableFuture<Void> initialize() {
Expand Down Expand Up @@ -332,7 +329,7 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
false, cnx, cnx.getAuthRole(), metadata, readCompacted, keySharedMeta, MessageId.latest,
DEFAULT_CONSUMER_EPOCH, schemaType);
if (isMigrated()) {
consumer.topicMigrated(getClusterMigrationUrl());
consumer.topicMigrated(getMigratedClusterUrl());
}

addConsumerToSubscription(subscription, consumer).thenRun(() -> {
Expand Down Expand Up @@ -949,7 +946,7 @@ public boolean isActive() {

@Override
public CompletableFuture<Void> checkClusterMigration() {
Optional<ClusterUrl> url = getClusterMigrationUrl();
Optional<ClusterUrl> url = getMigratedClusterUrl();
if (url.isPresent()) {
this.migrated = true;
producers.forEach((__, producer) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2588,6 +2588,7 @@ private boolean hasBacklogs() {
@Override
public CompletableFuture<Void> checkClusterMigration() {
Optional<ClusterUrl> clusterUrl = getMigratedClusterUrl();

if (!clusterUrl.isPresent()) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -2614,13 +2615,13 @@ private CompletableFuture<Void> initMigration() {
return CompletableFuture.completedFuture(null);
}
log.info("{} initializing subscription created at migration cluster", topic);
return getMigratedClusterUrlAsync(getBrokerService().getPulsar()).thenCompose(clusterUrl -> {
return getMigratedClusterUrlAsync(getBrokerService().getPulsar(), topic).thenCompose(clusterUrl -> {
if (!brokerService.getPulsar().getConfig().isClusterMigrationAutoResourceCreation()) {
return CompletableFuture.completedFuture(null);
}
if (!clusterUrl.isPresent()) {
return FutureUtil
.failedFuture(new TopicMigratedException("cluster migration service-url is not configired"));
.failedFuture(new TopicMigratedException("cluster migration service-url is not configured"));
}
ClusterUrl url = clusterUrl.get();
ClusterData clusterData = ClusterData.builder().serviceUrl(url.getServiceUrl())
Expand Down
Loading