Skip to content

Commit

Permalink
[feat] [broker] PIP-188 Add support to auto create topic resources in…
Browse files Browse the repository at this point in the history
…to green cluster before migration [part-3] (apache#21354)
  • Loading branch information
rdhabalia authored and vraulji committed Oct 16, 2023
1 parent 9231a81 commit 3cd07b4
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 15 deletions.
4 changes: 4 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,10 @@ aggregatePublisherStatsByProducerName=false
# if cluster is marked migrated. Disable with value 0. (Default disabled).
clusterMigrationCheckDurationSeconds=0

# Flag to start cluster migration for topic only after creating all topic's resources
# such as tenant, namespaces, subscriptions at new green cluster. (Default disabled).
clusterMigrationAutoResourceCreation=false

### --- Schema storage --- ###
# The schema storage implementation used by this broker
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
8 changes: 8 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,14 @@ splitTopicAndPartitionLabelInPrometheus=false
# Otherwise, aggregate it by list index.
aggregatePublisherStatsByProducerName=false

# Interval between checks to see if cluster is migrated and marks topic migrated
# if cluster is marked migrated. Disable with value 0. (Default disabled).
clusterMigrationCheckDurationSeconds=0

# Flag to start cluster migration for topic only after creating all topic's resources
# such as tenant, namespaces, subscriptions at new green cluster. (Default disabled).
clusterMigrationAutoResourceCreation=false

### --- Schema storage --- ###
# The schema storage implementation used by this broker.
schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2778,6 +2778,13 @@ The delayed message index time step(in seconds) in per bucket snapshot segment,
)
private int clusterMigrationCheckDurationSeconds = 0;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Flag to start cluster migration for topic only after creating all topic's resources"
+ " such as tenant, namespaces, subscriptions at new green cluster. (Default disabled)."
)
private boolean clusterMigrationAutoResourceCreation = false;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "Enforce schema validation on following cases:\n\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -103,6 +104,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicMigratedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedSubscriptionException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
Expand All @@ -125,6 +127,8 @@
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
Expand All @@ -137,10 +141,12 @@
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
Expand Down Expand Up @@ -206,6 +212,9 @@ public static boolean isDedupCursorName(String name) {

private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;

private static final String MIGRATION_CLUSTER_NAME = "migration-cluster";
private volatile boolean migrationSubsCreated = false;

// topic has every published chunked message since topic is loaded
public boolean msgChunkPublished;

Expand Down Expand Up @@ -2582,16 +2591,110 @@ public CompletableFuture<Void> checkClusterMigration() {
if (!clusterUrl.isPresent()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<?> migrated = !isMigrated() ? ledger.asyncMigrate() :
CompletableFuture.completedFuture(null);
return migrated.thenApply(__ -> {
subscriptions.forEach((name, sub) -> {
if (sub.isSubsciptionMigrated()) {
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
}
});
return null;
}).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ -> checkAndUnsubscribeSubscriptions());
return initMigration().thenCompose(subCreated -> {
migrationSubsCreated = true;
CompletableFuture<?> migrated = !isMigrated() ? ledger.asyncMigrate()
: CompletableFuture.completedFuture(null);
return migrated.thenApply(__ -> {
subscriptions.forEach((name, sub) -> {
if (sub.isSubsciptionMigrated()) {
sub.getConsumers().forEach(Consumer::checkAndApplyTopicMigration);
}
});
return null;
}).thenCompose(__ -> checkAndDisconnectReplicators()).thenCompose(__ -> checkAndUnsubscribeSubscriptions());
});
}

/**
* Initialize migration for a topic by creating topic's resources at migration cluster.
*/
private CompletableFuture<Void> initMigration() {
if (migrationSubsCreated) {
return CompletableFuture.completedFuture(null);
}
log.info("{} initializing subscription created at migration cluster", topic);
return getMigratedClusterUrlAsync(getBrokerService().getPulsar()).thenCompose(clusterUrl -> {
if (!brokerService.getPulsar().getConfig().isClusterMigrationAutoResourceCreation()) {
return CompletableFuture.completedFuture(null);
}
if (!clusterUrl.isPresent()) {
return FutureUtil
.failedFuture(new TopicMigratedException("cluster migration service-url is not configired"));
}
ClusterUrl url = clusterUrl.get();
ClusterData clusterData = ClusterData.builder().serviceUrl(url.getServiceUrl())
.serviceUrlTls(url.getServiceUrlTls()).brokerServiceUrl(url.getBrokerServiceUrl())
.brokerServiceUrlTls(url.getBrokerServiceUrlTls()).build();
PulsarAdmin admin = getBrokerService().getClusterPulsarAdmin(MIGRATION_CLUSTER_NAME,
Optional.of(clusterData));

// namespace creation
final String tenant = TopicName.get(topic).getTenant();
final NamespaceName ns = TopicName.get(topic).getNamespaceObject();
List<CompletableFuture<Void>> subResults = new ArrayList<>();

return brokerService.getPulsar().getPulsarResources().getTenantResources().getTenantAsync(tenant)
.thenCompose(tenantInfo -> {
if (!tenantInfo.isPresent()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> ts = new CompletableFuture<>();
admin.tenants().createTenantAsync(tenant, tenantInfo.get()).handle((__, ex) -> {
if (ex == null || ex instanceof ConflictException) {
log.info("[{}] successfully created tenant {} for migration", topic, tenant);
ts.complete(null);
return null;
}
log.warn("[{}] Failed to create tenant {} on migration cluster {}", topic, tenant,
ex.getCause().getMessage());
ts.completeExceptionally(ex.getCause());
return null;
});
return ts;
}).thenCompose(t -> {
return brokerService.getPulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(ns).thenCompose(policies -> {
if (!policies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> nsFuture = new CompletableFuture<>();
admin.namespaces().createNamespaceAsync(ns.toString(), policies.get())
.handle((__, ex) -> {
if (ex == null || ex instanceof ConflictException) {
log.info("[{}] successfully created namespace {} for migration",
topic, ns);
nsFuture.complete(null);
return null;
}
log.warn("[{}] Failed to create namespace {} on migration cluster {}",
topic, ns, ex.getCause().getMessage());
nsFuture.completeExceptionally(ex.getCause());
return null;
});
return nsFuture;
}).thenCompose(p -> {
subscriptions.forEach((subName, sub) -> {
CompletableFuture<Void> subResult = new CompletableFuture<>();
subResults.add(subResult);
admin.topics().createSubscriptionAsync(topic, subName, MessageId.earliest)
.handle((__, ex) -> {
if (ex == null || ex instanceof ConflictException) {
log.info("[{}] successfully created sub {} for migration",
topic, subName);
subResult.complete(null);
return null;
}
log.warn("[{}] Failed to create sub {} on migration cluster, {}",
topic, subName, ex.getCause().getMessage());
subResult.completeExceptionally(ex.getCause());
return null;
});
});
return Futures.waitForAll(subResults);
});
});
});
}

private CompletableFuture<Void> checkAndUnsubscribeSubscriptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@

import java.lang.reflect.Method;
import java.net.URL;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -267,7 +270,8 @@ public void testClusterMigration(boolean persistent, SubscriptionType subType) t
AbstractTopic topic2 = (AbstractTopic) pulsar2.getBrokerService().getTopic(topicName, false).getNow(null).get();
assertFalse(topic2.getProducers().isEmpty());

ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);

retryStrategically((test) -> {
Expand Down Expand Up @@ -454,7 +458,8 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc

retryStrategically((test) -> topic2.getReplicators().size() == 1, 10, 2000);
log.info("replicators should be ready");
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);
log.info("update cluster migration called");
retryStrategically((test) -> {
Expand Down Expand Up @@ -494,6 +499,85 @@ public void testClusterMigrationWithReplicationBacklog(boolean persistent, Subsc
assertEquals(topic2.getProducers().size(), 2);
}

/**
* This test validates that blue cluster first creates list of subscriptions into green cluster so, green cluster
* will not lose the data if producer migrates.
*
* @throws Exception
*/
@Test
public void testClusterMigrationWithResourceCreated() throws Exception {
log.info("--- Starting testClusterMigrationWithResourceCreated ---");

String tenant = "pulsar2";
String namespace = tenant + "/migration";
String greenClusterName = pulsar2.getConfig().getClusterName();
String blueClusterName = pulsar1.getConfig().getClusterName();
admin1.clusters().createCluster(greenClusterName,
ClusterData.builder().serviceUrl(url2.toString()).serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls()).build());
admin2.clusters().createCluster(blueClusterName,
ClusterData.builder().serviceUrl(url1.toString()).serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls()).build());

admin1.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
Sets.newHashSet("r1", greenClusterName)));
// broker should handle already tenant creation
admin2.tenants().createTenant(tenant, new TenantInfoImpl(Sets.newHashSet("appid1", "appid2", "appid3"),
Sets.newHashSet("r1", greenClusterName)));
admin1.namespaces().createNamespace(namespace, Sets.newHashSet("r1", greenClusterName));

final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/migrationTopic");

broker1.getPulsarService().getConfig().setClusterMigrationAutoResourceCreation(true);
@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
// cluster-1 producer/consumer
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).enableBatching(false)
.producerName("cluster1-1").messageRoutingMode(MessageRoutingMode.SinglePartition).create();
// create subscriptions
admin1.topics().createSubscription(topicName, "s1", MessageId.earliest);
admin1.topics().createSubscription(topicName, "s2", MessageId.earliest);

ClusterUrl migratedUrl = new ClusterUrl(pulsar2.getWebServiceAddress(), pulsar2.getWebServiceAddressTls(),
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", true, migratedUrl);

PersistentTopic topic1 = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName, false).getNow(null)
.get();
retryStrategically((test) -> {
try {
topic1.checkClusterMigration().get();
return true;
} catch (Exception e) {
// ok
}
return false;
}, 10, 500);

assertNotNull(admin2.tenants().getTenantInfo(tenant));
assertNotNull(admin2.namespaces().getPolicies(namespace));
List<String> subLists = admin2.topics().getSubscriptions(topicName);
assertTrue(subLists.contains("s1"));
assertTrue(subLists.contains("s2"));

int n = 5;
for (int i = 0; i < n; i++) {
producer1.send("test1".getBytes());
}

Consumer<byte[]> consumer1 = client1.newConsumer().topic(topicName).subscriptionName("s1").subscribe();
for (int i = 0; i < n; i++) {
assertNotNull(consumer1.receive());
}

consumer1.close();
producer1.close();
}

static class TestBroker extends MockedPulsarServiceBaseTest {

private String clusterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,14 @@ static Builder builder() {
@NoArgsConstructor
@AllArgsConstructor
class ClusterUrl {
String serviceUrl;
String serviceUrlTls;
String brokerServiceUrl;
String brokerServiceUrlTls;

public boolean isEmpty() {
return brokerServiceUrl == null && brokerServiceUrlTls == null;
return serviceUrl != null && serviceUrlTls != null && brokerServiceUrl == null
&& brokerServiceUrlTls == null;
}
}
}
Loading

0 comments on commit 3cd07b4

Please sign in to comment.