Skip to content

Commit

Permalink
test fix 4/12
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Apr 13, 2024
1 parent a6a63a1 commit 902c6f0
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 27 deletions.
2 changes: 1 addition & 1 deletion conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ managedLedgerTraceTaskExecution=true

### --- Load balancer --- ###

loadManagerClassName=org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl
loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager

# Enable load balancer
loadBalancerEnabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public synchronized void start() throws PulsarServerException {
log.info("Closed the channel producer.");
}
}

PulsarClusterMetadataSetup.createTenantIfAbsent
(pulsar.getPulsarResources(), SYSTEM_NAMESPACE.getTenant(), config.getClusterName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,11 @@ public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle,
}

public CompletableFuture<Boolean> isNamespaceBundleOwned(NamespaceBundle bundle) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
ExtensibleLoadManagerImpl extensibleLoadManager = ExtensibleLoadManagerImpl.get(loadManager.get());
return extensibleLoadManager.getOwnershipAsync(Optional.empty(), bundle)
.thenApply(Optional::isPresent);
}
return pulsar.getLocalMetadataStore().exists(ServiceUnitUtils.path(bundle));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1789,19 +1789,15 @@ private CompletableFuture<Void> checkTopicAlreadyMigrated(TopicName topicName) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> result = new CompletableFuture<>();
if (ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
result.complete(null);
} else {
AbstractTopic.isClusterMigrationEnabled(pulsar, topicName.toString()).handle((isMigrated, ex) -> {
if (isMigrated) {
result.completeExceptionally(
new BrokerServiceException.TopicMigratedException(topicName + " already migrated"));
} else {
result.complete(null);
}
return null;
});
}
AbstractTopic.isClusterMigrationEnabled(pulsar, topicName.toString()).handle((isMigrated, ex) -> {
if (isMigrated) {
result.completeExceptionally(
new BrokerServiceException.TopicMigratedException(topicName + " already migrated"));
} else {
result.complete(null);
}
return null;
});
return result;
}

Expand Down Expand Up @@ -2081,12 +2077,7 @@ public void checkGC() {
}

public void checkClusterMigration() {
topics.forEach((n, t) -> {
Optional<Topic> topic = extractTopic(t);
if (topic.isPresent() && !ExtensibleLoadManagerImpl.isInternalTopic(topic.get().getName())) {
topic.ifPresent(Topic::checkClusterMigration);
}
});
forEachTopic(Topic::checkClusterMigration);
}

public void checkMessageExpiry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,8 @@ public CompletableFuture<Void> stopReplProducers() {
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1704,7 +1704,8 @@ CompletableFuture<Void> checkPersistencePolicies() {
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) {
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)
|| ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2095,10 +2095,11 @@ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
startBroker();

String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic";
admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),

updateTenant(SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", "usc", "usw")));

admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
setupSystemNamespace();
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(systemTopic).create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,14 @@ protected void setupSystemNamespace() throws Exception {
}
}

protected void updateTenant(String tenant, TenantInfoImpl tenantInfo) throws Exception {
if (!admin.tenants().getTenants().contains(tenant)) {
admin.tenants().createTenant(tenant, tenantInfo);
} else {
admin.tenants().updateTenant(tenant, tenantInfo);
}
}


protected Object asyncRequests(Consumer<TestAsyncResponse> function) throws Exception {
TestAsyncResponse ctx = new TestAsyncResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -55,8 +56,10 @@ public void cleanup() throws Exception {

@Test
public void checkLoadReportNicSpeed() throws Exception {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
return;
}
// Since we have overridden the NIC speed in the configuration, the load report for the broker should always

LoadManagerReport report = admin.brokerStats().getLoadReport();

if (SystemUtils.IS_OS_LINUX) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ protected void setup() throws Exception {
conf.setAuthenticationProviders(ImmutableSet.of(AuthenticationProviderTls.class.getName()));
conf.setNumExecutorThreadPoolSize(5);

conf.setBrokerClientTlsEnabled(true);
conf.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
conf.setBrokerClientAuthenticationParameters(
String.format("tlsCertFile:%s,tlsKeyFile:%s",
getTlsFileForClient("admin.cert"), getTlsFileForClient("admin.key-pk8")));
conf.setBrokerClientTrustCertsFilePath(CA_CERT_FILE_PATH);

super.internalSetup();

// start proxy service
Expand Down Expand Up @@ -135,7 +142,7 @@ public void testAuthenticatedProxyAsAdmin() throws Exception {
adminAdmin.tenants().createTenant("tenant1",
new TenantInfoImpl(ImmutableSet.of("randoUser"),
ImmutableSet.of(configClusterName)));
Assert.assertEquals(ImmutableSet.of("tenant1"), adminAdmin.tenants().getTenants());
Assert.assertTrue(adminAdmin.tenants().getTenants().contains("tenant1"));
}
}

Expand Down

0 comments on commit 902c6f0

Please sign in to comment.